Parallel Class та PLINQ — Data Parallelism
Parallel Class та PLINQ — Data Parallelism
Вступ: Task vs Parallel — Різні Підходи до Паралелізму
У попередньому файлі ви навчились працювати з Task — абстракцією одиниці роботи, яка виконується асинхронно. Task ідеально підходить для task parallelism — коли у вас є декілька різних операцій, які потрібно виконати паралельно:
// Task Parallelism — різні операції
Task task1 = Task.Run(() => DownloadFile("url1"));
Task task2 = Task.Run(() => ProcessImage("image.jpg"));
Task task3 = Task.Run(() => SendEmail("user@example.com"));
Task.WaitAll(task1, task2, task3);
Але є інший тип паралелізму — data parallelism (паралелізм даних): коли у вас є одна операція, яку потрібно виконати над великою кількістю елементів:
// Data Parallelism — одна операція над багатьма елементами
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();
// Обчислити квадрат кожного числа паралельно
foreach (int number in numbers)
{
int square = number * number;
// ...
}
Для data parallelism .NET надає два інструменти:
Parallelclass — імперативний підхід (Parallel.For, Parallel.ForEach)- PLINQ — декларативний підхід (Parallel LINQ)
6. Parallel Class: Імперативний Data Parallelism
Parallel.For() — Паралельний Цикл з Індексом
Parallel.For() — це паралельний аналог звичайного for циклу. Він автоматично розподіляє ітерації між потоками ThreadPool.
using System.Threading.Tasks;
using System.Diagnostics;
static void ProcessItem(int i)
{
// Імітація CPU-bound роботи
double result = 0;
for (int j = 0; j < 1_000_000; j++)
result += Math.Sqrt(i * j);
}
int itemCount = 100;
// ❌ Послідовний for
var sw1 = Stopwatch.StartNew();
for (int i = 0; i < itemCount; i++)
{
ProcessItem(i);
}
sw1.Stop();
Console.WriteLine($"Послідовний for: {sw1.ElapsedMilliseconds}ms");
// ✅ Паралельний Parallel.For
var sw2 = Stopwatch.StartNew();
Parallel.For(0, itemCount, i =>
{
ProcessItem(i);
});
sw2.Stop();
Console.WriteLine($"Parallel.For: {sw2.ElapsedMilliseconds}ms");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
Як це працює:
Parallel.For()розбиває діапазон[0, itemCount)на chunks (шматки)- Кожен chunk виконується на окремому потоці ThreadPool
- ThreadPool автоматично балансує навантаження між потоками
- Головний потік блокується поки всі ітерації не завершаться
Синтаксис:
// Базовий варіант
Parallel.For(fromInclusive: 0, toExclusive: 100, body: i =>
{
Console.WriteLine($"Обробка елемента {i} на потоці {Thread.CurrentThread.ManagedThreadId}");
});
// З ParallelOptions
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 4 // максимум 4 потоки
};
Parallel.For(0, 100, options, i =>
{
ProcessItem(i);
});
Parallel.ForEach() — Паралельна Обробка Колекцій
Parallel.ForEach() — це паралельний аналог foreach. Використовується коли у вас є колекція елементів (не обов'язково з індексами).
using System.Threading.Tasks;
using System.Diagnostics;
static void ProcessImage(string imagePath)
{
Console.WriteLine($"[Thread {Thread.CurrentThread.ManagedThreadId}] Обробка {imagePath}");
Thread.Sleep(500); // імітація обробки
}
string[] images =
[
"image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg",
"image5.jpg", "image6.jpg", "image7.jpg", "image8.jpg"
];
var sw = Stopwatch.StartNew();
// ✅ Паралельна обробка
Parallel.ForEach(images, imagePath =>
{
ProcessImage(imagePath);
});
sw.Stop();
Console.WriteLine($"\nВсі зображення оброблені за {sw.ElapsedMilliseconds}ms");
ParallelOptions — Налаштування Паралелізму
ParallelOptions дозволяє контролювати поведінку Parallel.For/ForEach:
using System.Threading;
using System.Threading.Tasks;
var options = new ParallelOptions
{
// Максимальна кількість паралельних потоків
MaxDegreeOfParallelism = 4,
// CancellationToken для скасування
CancellationToken = cancellationToken,
// TaskScheduler (за замовчуванням — ThreadPool)
TaskScheduler = TaskScheduler.Default
};
Parallel.For(0, 1000, options, i =>
{
ProcessItem(i);
});
MaxDegreeOfParallelism:
| Значення | Поведінка |
|---|---|
-1 (default) | Необмежена кількість потоків (ThreadPool вирішує) |
1 | Послідовне виконання (без паралелізму) |
N | Максимум N паралельних потоків |
Коли обмежувати MaxDegreeOfParallelism:
- ✅ Операції використовують багато пам'яті (щоб не вичерпати RAM)
- ✅ Зовнішній ресурс має обмеження (наприклад, база даних підтримує max 10 з'єднань)
- ✅ Тестування — щоб контролювати навантаження
// Приклад: обмеження паралелізму для HTTP запитів
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 5 // максимум 5 одночасних запитів
};
string[] urls = GetUrls(); // 1000 URLs
Parallel.ForEach(urls, options, url =>
{
using var client = new HttpClient();
string content = client.GetStringAsync(url).Result;
ProcessContent(content);
});
CancellationToken у Parallel
Parallel.For/ForEach підтримує скасування через CancellationToken:
using System.Threading;
using System.Threading.Tasks;
using var cts = new CancellationTokenSource();
var options = new ParallelOptions
{
CancellationToken = cts.Token
};
// Скасовуємо через 2 секунди
Task.Run(() =>
{
Thread.Sleep(2000);
cts.Cancel();
});
try
{
Parallel.For(0, 1000, options, i =>
{
// Не потрібно вручну перевіряти token — Parallel робить це автоматично
Thread.Sleep(100);
Console.WriteLine($"Обробка елемента {i}");
});
}
catch (OperationCanceledException)
{
Console.WriteLine("Parallel.For скасовано");
}
Parallel.For/ForEach автоматично перевіряє CancellationToken між ітераціями. Вам не потрібно вручну викликати ThrowIfCancellationRequested() (хоча можна для швидшої реакції).ParallelLoopState — Контроль Виконання
ParallelLoopState дозволяє контролювати виконання циклу зсередини ітерації:
using System.Threading.Tasks;
int[] numbers = Enumerable.Range(1, 100).ToArray();
// Шукаємо число 42 паралельно
Parallel.ForEach(numbers, (number, state) =>
{
Console.WriteLine($"Перевірка {number} на потоці {Thread.CurrentThread.ManagedThreadId}");
if (number == 42)
{
Console.WriteLine($"Знайдено 42! Зупиняємо пошук.");
state.Stop(); // зупинити всі ітерації якомога швидше
return;
}
Thread.Sleep(50); // імітація роботи
});
Console.WriteLine("Пошук завершено");
ParallelLoopState методи:
| Метод | Опис |
|---|---|
Stop() | Зупинити всі ітерації якомога швидше (не гарантує миттєву зупинку) |
Break() | Зупинити ітерації після поточної (гарантує що всі ітерації до поточної завершаться) |
IsStopped | Чи був викликаний Stop() |
IsExceptional | Чи була exception в іншій ітерації |
ShouldExitCurrentIteration | Чи потрібно завершити поточну ітерацію |
Stop() vs Break():
// Stop() — зупинити ВСЕ якомога швидше
Parallel.For(0, 1000, (i, state) =>
{
if (i == 500)
state.Stop(); // ітерації 501-999 можуть НЕ виконатись
ProcessItem(i);
});
// Break() — зупинити після поточної, але гарантувати виконання попередніх
Parallel.For(0, 1000, (i, state) =>
{
if (i == 500)
state.Break(); // ітерації 0-500 гарантовано виконаються
ProcessItem(i);
});
Thread-Local State — Оптимізація для Агрегації
Коли потрібно агрегувати результати (наприклад, підрахувати суму), наївний підхід з lock буде повільним:
// ❌ Повільно — lock на кожній ітерації
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
long sum = 0;
object lockObj = new();
Parallel.ForEach(numbers, number =>
{
lock (lockObj) // contention на кожній ітерації!
{
sum += number;
}
});
Console.WriteLine($"Сума: {sum}");
Рішення: використовувати thread-local state — кожен потік має власний локальний акумулятор, і тільки в кінці результати об'єднуються:
using System.Threading.Tasks;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// ✅ Швидко — thread-local aggregation
long totalSum = Parallel.ForEach(
source: numbers,
// localInit — ініціалізація локального стану для кожного потоку
localInit: () => 0L,
// body — обробка елемента з локальним станом
body: (number, state, localSum) =>
{
return localSum + number; // додаємо до локального акумулятора
},
// localFinally — об'єднання локальних результатів (викликається один раз на потік)
localFinally: localSum =>
{
Interlocked.Add(ref totalSum, localSum); // atomic додавання
}
).Result; // ParallelLoopResult містить інформацію про виконання
Console.WriteLine($"Сума: {totalSum}");
Як це працює:
- localInit — викликається один раз для кожного потоку, створює локальний акумулятор
- body — викликається для кожного елемента, оновлює локальний акумулятор (без lock!)
- localFinally — викликається один раз для кожного потоку, об'єднує локальні результати в глобальний
Benchmark:
Наївний підхід (lock на кожній ітерації): 2,847ms
Thread-local aggregation: 142ms
Прискорення: 20x
Parallel.Invoke() — Паралельне Виконання Методів
Parallel.Invoke() виконує декілька різних методів паралельно:
using System.Threading.Tasks;
using System.Diagnostics;
static void Task1()
{
Console.WriteLine($"Task1 на потоці {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000);
}
static void Task2()
{
Console.WriteLine($"Task2 на потоці {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1500);
}
static void Task3()
{
Console.WriteLine($"Task3 на потоці {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(800);
}
var sw = Stopwatch.StartNew();
// Виконати всі три методи паралельно
Parallel.Invoke(Task1, Task2, Task3);
sw.Stop();
Console.WriteLine($"Всі задачі завершені за {sw.ElapsedMilliseconds}ms");
Коли використовувати Parallel.Invoke:
- ✅ Декілька незалежних CPU-bound операцій
- ✅ Кількість операцій відома заздалегідь
- ❌ Для I/O-bound операцій краще
Task.WhenAll()
Коли НЕ Використовувати Parallel
Parallel.For/ForEach не завжди дає прискорення. Є випадки коли він повільніший за послідовне виконання:
❌ Дуже швидкі операції (overhead паралелізму більший за виграш):
// ❌ Повільніше за звичайний for
Parallel.For(0, 1000, i =>
{
int result = i * 2; // занадто швидка операція
});
❌ Мало елементів:
// ❌ Overhead створення потоків більший за виграш
int[] numbers = [1, 2, 3, 4, 5];
Parallel.ForEach(numbers, n => ProcessItem(n));
❌ I/O-bound операції (використовуйте Task-based підхід замість):
// ❌ Блокує потоки ThreadPool
Parallel.ForEach(urls, url =>
{
string content = DownloadSync(url); // блокуюче I/O
});
// ✅ Правильно — використовуйте Task.WhenAll
var tasks = urls.Select(url => DownloadAsync(url));
var results = Task.WhenAll(tasks).Result;
❌ Операції з великим shared state та contention:
// ❌ Lock на кожній ітерації — повільніше за послідовне виконання
object lockObj = new();
Parallel.For(0, 1000, i =>
{
lock (lockObj)
{
// вся робота під lock — немає паралелізму!
DoWork(i);
}
});
Правило: використовуйте Parallel коли:
- ✅ CPU-bound операції (обчислення, обробка даних)
- ✅ Кожна ітерація займає хоча б ~1ms (інакше overhead паралелізму занадто великий)
- ✅ Багато елементів (хоча б 100+)
- ✅ Мінімальний shared state (або thread-local aggregation)
7. PLINQ: Parallel LINQ
Вступ: Декларативний Data Parallelism
Parallel.For/ForEach — це імперативний підхід: ви явно пишете цикли та контролюєте виконання. PLINQ (Parallel LINQ) — це декларативний підхід: ви описуєте що потрібно зробити, а не як.
// Імперативний підхід (Parallel.ForEach)
var results = new ConcurrentBag<int>();
Parallel.ForEach(numbers, number =>
{
if (number % 2 == 0)
{
int square = number * number;
results.Add(square);
}
});
// Декларативний підхід (PLINQ)
var results = numbers
.AsParallel()
.Where(n => n % 2 == 0)
.Select(n => n * n)
.ToArray();
PLINQ автоматично:
- ✅ Розподіляє роботу між потоками
- ✅ Балансує навантаження
- ✅ Об'єднує результати
- ✅ Обробляє exceptions
AsParallel() — Увімкнення PLINQ
Щоб увімкнути паралельну обробку LINQ запиту, додайте .AsParallel():
using System.Linq;
using System.Diagnostics;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// ❌ Звичайний LINQ — послідовне виконання
var sw1 = Stopwatch.StartNew();
var result1 = numbers
.Where(n => IsPrime(n))
.ToArray();
sw1.Stop();
Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, знайдено {result1.Length} простих чисел");
// ✅ PLINQ — паралельне виконання
var sw2 = Stopwatch.StartNew();
var result2 = numbers
.AsParallel() // увімкнути паралелізм
.Where(n => IsPrime(n))
.ToArray();
sw2.Stop();
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, знайдено {result2.Length} простих чисел");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
static bool IsPrime(int n)
{
if (n < 2) return false;
for (int i = 2; i * i <= n; i++)
if (n % i == 0) return false;
return true;
}
AsOrdered() — Збереження Порядку
За замовчуванням PLINQ не гарантує порядок результатів (для максимальної продуктивності). Якщо порядок важливий — використовуйте AsOrdered():
int[] numbers = Enumerable.Range(1, 20).ToArray();
// ❌ Без AsOrdered — порядок не гарантується
var unordered = numbers
.AsParallel()
.Select(n => n * n)
.ToArray();
Console.WriteLine("Без AsOrdered: " + string.Join(", ", unordered));
// Вивід: 16, 1, 4, 25, 9, 36, 49, 64, ... (випадковий порядок)
// ✅ З AsOrdered — порядок збережено
var ordered = numbers
.AsParallel()
.AsOrdered() // зберегти порядок
.Select(n => n * n)
.ToArray();
Console.WriteLine("З AsOrdered: " + string.Join(", ", ordered));
// Вивід: 1, 4, 9, 16, 25, 36, 49, 64, ... (правильний порядок)
AsOrdered() має overhead — результати потрібно буферизувати та сортувати. Використовуйте тільки коли порядок дійсно важливий.WithDegreeOfParallelism() — Контроль Кількості Потоків
За замовчуванням PLINQ використовує стільки потоків, скільки вважає оптимальним (зазвичай = кількість CPU cores). Можна обмежити:
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();
// Обмежити до 4 потоків
var result = numbers
.AsParallel()
.WithDegreeOfParallelism(4) // максимум 4 потоки
.Where(n => n % 2 == 0)
.Select(n => n * n)
.ToArray();
Console.WriteLine($"Оброблено {result.Length} елементів");
Коли використовувати:
- ✅ Обмеження ресурсів (пам'ять, зовнішні API)
- ✅ Тестування та benchmarking
- ❌ Не встановлюйте занадто низьке значення — втратите паралелізм
WithCancellation() — Скасування PLINQ
PLINQ підтримує скасування через CancellationToken:
using System.Threading;
using System.Linq;
using var cts = new CancellationTokenSource();
// Скасовуємо через 2 секунди
Task.Run(() =>
{
Thread.Sleep(2000);
cts.Cancel();
});
try
{
var result = Enumerable.Range(1, 10_000_000)
.AsParallel()
.WithCancellation(cts.Token) // передаємо токен
.Where(n =>
{
Thread.Sleep(1); // імітація повільної роботи
return IsPrime(n);
})
.ToArray();
Console.WriteLine($"Знайдено {result.Length} простих чисел");
}
catch (OperationCanceledException)
{
Console.WriteLine("PLINQ запит скасовано");
}
WithExecutionMode() — Контроль Паралелізму
PLINQ автоматично вирішує чи варто виконувати запит паралельно (аналізує overhead). Можна форсувати:
using System.Linq;
int[] numbers = Enumerable.Range(1, 100).ToArray();
// Форсувати паралельне виконання (навіть якщо PLINQ вважає що це неефективно)
var result = numbers
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(n => n * n)
.ToArray();
ParallelExecutionMode:
| Режим | Опис |
|---|---|
Default | PLINQ сам вирішує (рекомендовано) |
ForceParallelism | Завжди виконувати паралельно (навіть якщо неефективно) |
Default — PLINQ добре вміє визначати коли паралелізм вигідний.WithMergeOptions() — Контроль Буферизації
PLINQ буферизує результати перед поверненням. Можна контролювати стратегію буферизації:
using System.Linq;
int[] numbers = Enumerable.Range(1, 1000).ToArray();
// Повертати результати якомога швидше (мінімальна буферизація)
var result = numbers
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(n => n * n);
foreach (var item in result)
{
Console.WriteLine(item); // результати з'являються одразу
}
ParallelMergeOptions:
| Опція | Опис | Use Case |
|---|---|---|
Default | PLINQ сам вирішує | Рекомендовано для більшості випадків |
NotBuffered | Мінімальна буферизація, результати одразу | Streaming, UI updates |
AutoBuffered | Помірна буферизація | Баланс між throughput та latency |
FullyBuffered | Максимальна буферизація | Максимальний throughput |
ForAll() — Паралельна Обробка без Збору Результатів
Якщо не потрібно збирати результати (наприклад, запис у файл або базу даних), використовуйте ForAll() замість ToArray():
using System.Linq;
int[] numbers = Enumerable.Range(1, 1000).ToArray();
// ❌ Неефективно — збирає результати які не потрібні
numbers
.AsParallel()
.Select(n => n * n)
.ToArray(); // створює масив який ніхто не використовує
// ✅ Ефективно — обробляє без збору результатів
numbers
.AsParallel()
.ForAll(n =>
{
int square = n * n;
Console.WriteLine($"{n}² = {square}");
// або запис у файл, базу даних, тощо
});
Різниця:
ToArray()/ToList()— збирає всі результати в колекціюForAll()— обробляє кожен елемент без збору результатів (швидше, менше пам'яті)
Aggregate() — Паралельна Агрегація
PLINQ має спеціальний метод Aggregate() для паралельної агрегації (сума, добуток, тощо):
using System.Linq;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// Паралельна сума
long sum = numbers
.AsParallel()
.Aggregate(
seed: 0L, // початкове значення для кожного потоку
// updateAccumulatorFunc — оновлення локального акумулятора
(localSum, number) => localSum + number,
// combineAccumulatorsFunc — об'єднання локальних акумуляторів
(sum1, sum2) => sum1 + sum2,
// resultSelector — фінальна трансформація (опціонально)
finalSum => finalSum
);
Console.WriteLine($"Сума: {sum}");
Спрощений варіант (коли seed та combineFunc однакові):
// Паралельна сума (спрощено)
long sum = numbers
.AsParallel()
.Sum(); // PLINQ автоматично паралелізує Sum()
// Паралельний добуток
long product = numbers
.AsParallel()
.Aggregate(1L, (acc, n) => acc * n);
**Partitioning Strategies — Стратегії Розбиття
PLINQ автоматично розбиває вхідну колекцію на chunks для паралельної обробки. Є три стратегії:
1. Range Partitioning — для індексованих колекцій (масиви, списки):
// PLINQ розбиває масив на рівні chunks
int[] numbers = new int[1000];
var result = numbers.AsParallel().Select(n => n * n).ToArray();
// Приклад розбиття на 4 потоки:
// Thread 1: [0..249]
// Thread 2: [250..499]
// Thread 3: [500..749]
// Thread 4: [750..999]
2. Chunk Partitioning — для неіндексованих колекцій (IEnumerable):
// PLINQ бере елементи по одному і розподіляє між потоками
IEnumerable<int> numbers = GetNumbers(); // не масив, а IEnumerable
var result = numbers.AsParallel().Select(n => n * n).ToArray();
// Динамічний розподіл — кожен потік бере наступний елемент коли звільняється
3. Hash Partitioning — для операцій з групуванням (GroupBy, Join):
// PLINQ використовує hash для розподілу елементів
var grouped = numbers
.AsParallel()
.GroupBy(n => n % 10) // групування по останній цифрі
.ToArray();
// Елементи з однаковим hash потрапляють на один потік
PLINQ Performance Tips
✅ DO:
- Використовуйте PLINQ для CPU-bound операцій з великими колекціями (10,000+ елементів)
- Використовуйте
ForAll()замістьToArray()якщо результати не потрібні - Використовуйте
AsOrdered()тільки коли порядок дійсно важливий - Тестуйте performance — не всі запити стають швидшими з PLINQ
❌ DON'T:
- Не використовуйте PLINQ для швидких операцій (overhead > виграш)
- Не використовуйте PLINQ для малих колекцій (<1,000 елементів)
- Не використовуйте PLINQ для I/O-bound операцій (використовуйте
async/await) - Не використовуйте PLINQ якщо операції мають великий shared state з contention
Benchmark: LINQ vs PLINQ
using System.Linq;
using System.Diagnostics;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// LINQ
var sw1 = Stopwatch.StartNew();
var result1 = numbers
.Where(n => n % 2 == 0)
.Select(n => n * n)
.Sum();
sw1.Stop();
// PLINQ
var sw2 = Stopwatch.StartNew();
var result2 = numbers
.AsParallel()
.Where(n => n % 2 == 0)
.Select(n => n * n)
.Sum();
sw2.Stop();
Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, результат: {result1}");
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, результат: {result2}");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
8. Практичні Завдання
Рівень 1: Parallel File Processor з CancellationToken
Створіть утиліту для паралельної обробки файлів у директорії з підтримкою скасування.
Вимоги:
- Знайти всі
.txtфайли у директорії - Паралельно прочитати кожен файл та підрахувати кількість слів
- Підтримка
CancellationTokenдля скасування - Вивести прогрес обробки
// Приклад використання:
using var cts = new CancellationTokenSource();
// Скасувати через 5 секунд
cts.CancelAfter(TimeSpan.FromSeconds(5));
try
{
var results = ProcessFiles("C:/Documents", cts.Token);
foreach (var (file, wordCount) in results)
{
Console.WriteLine($"{file}: {wordCount} слів");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Обробка скасована");
}
Рівень 2: Monte Carlo π Calculation з Parallel та PLINQ
Обчисліть число π методом Монте-Карло використовуючи Parallel.For та PLINQ. Порівняйте performance.
Алгоритм:
- Генеруємо N випадкових точок у квадраті 0,1 × 0,1
- Підраховуємо скільки точок потрапили у чверть кола радіусом 1
- π ≈ 4 × (кількість точок у колі) / (загальна кількість точок)
Вимоги:
- Реалізувати через
Parallel.Forз thread-local state - Реалізувати через PLINQ
- Benchmark обох підходів
- N = 100,000,000 точок
Рівень 3: MapReduce Framework з Task.WhenAll
Створіть спрощений MapReduce framework для паралельної обробки великих текстових файлів.
Вимоги:
- Map phase: розбити файл на chunks, паралельно обробити кожен chunk (підрахувати частоту слів)
- Reduce phase: об'єднати результати з усіх chunks
- Використовувати
Task.WhenAll()для координації - Підтримка
CancellationToken - Progress reporting
// Приклад використання:
var mapReduce = new MapReduceEngine();
var wordFrequency = mapReduce.ProcessFile(
filePath: "large-text.txt",
chunkSize: 1024 * 1024, // 1 MB chunks
cancellationToken: cts.Token,
progress: new Progress<double>(p => Console.WriteLine($"Прогрес: {p:P0}"))
);
// Топ 10 найчастіших слів
var top10 = wordFrequency
.OrderByDescending(kvp => kvp.Value)
.Take(10);
foreach (var (word, count) in top10)
{
Console.WriteLine($"{word}: {count}");
}
Це завершує матеріал про TPL, Parallel та PLINQ. Ви навчились працювати з Task, композицією задач, CancellationToken, exception handling, Parallel class та PLINQ для ефективної паралельної обробки даних.
TPL, Task та Композиція — Від Thread до Task
Глибокий академічний розбір Task Parallel Library — еволюція від Thread до Task, Task<T>, композиція через WhenAll/WhenAny, CancellationToken, exception handling та AggregateException. Теорія і практика сучасного паралелізму.
Async/Await — Фундамент Асинхронного Програмування
Глибокий академічний розбір async/await у C# — від проблеми блокуючого I/O до state machine під капотом. Історія асинхронності (APM, EAP, TAP), синтаксис async/await, return types, exception handling та best practices.