У попередньому файлі ви навчились працювати з Task — абстракцією одиниці роботи, яка виконується асинхронно. Task ідеально підходить для task parallelism — коли у вас є декілька різних операцій, які потрібно виконати паралельно:
// Task Parallelism — різні операції
Task task1 = DownloadFileAsync("url1");
Task task2 = ProcessImageAsync("image.jpg");
Task task3 = SendEmailAsync("user@example.com");
await Task.WhenAll(task1, task2, task3);
Але є інший тип паралелізму — data parallelism (паралелізм даних): коли у вас є одна операція, яку потрібно виконати над великою кількістю елементів:
// Data Parallelism — одна операція над багатьма елементами
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();
// Обчислити квадрат кожного числа паралельно
foreach (int number in numbers)
{
int square = number * number;
// ...
}
Для data parallelism .NET надає два інструменти:
Parallel class — імперативний підхід (Parallel.For, Parallel.ForEach)Parallel.For() — це паралельний аналог звичайного for циклу. Він автоматично розподіляє ітерації між потоками ThreadPool.
using System.Threading.Tasks;
using System.Diagnostics;
static void ProcessItem(int i)
{
// Імітація CPU-bound роботи
double result = 0;
for (int j = 0; j < 1_000_000; j++)
result += Math.Sqrt(i * j);
}
int itemCount = 100;
// ❌ Послідовний for
var sw1 = Stopwatch.StartNew();
for (int i = 0; i < itemCount; i++)
{
ProcessItem(i);
}
sw1.Stop();
Console.WriteLine($"Послідовний for: {sw1.ElapsedMilliseconds}ms");
// ✅ Паралельний Parallel.For
var sw2 = Stopwatch.StartNew();
Parallel.For(0, itemCount, i =>
{
ProcessItem(i);
});
sw2.Stop();
Console.WriteLine($"Parallel.For: {sw2.ElapsedMilliseconds}ms");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
Як це працює:
Parallel.For() розбиває діапазон [0, itemCount) на chunks (шматки)Синтаксис:
// Базовий варіант
Parallel.For(fromInclusive: 0, toExclusive: 100, body: i =>
{
Console.WriteLine($"Обробка елемента {i} на потоці {Thread.CurrentThread.ManagedThreadId}");
});
// З ParallelOptions
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 4 // максимум 4 потоки
};
Parallel.For(0, 100, options, i =>
{
ProcessItem(i);
});
Parallel.ForEach() — це паралельний аналог foreach. Використовується коли у вас є колекція елементів (не обов'язково з індексами).
using System.Threading.Tasks;
using System.Diagnostics;
static void ProcessImage(string imagePath)
{
Console.WriteLine($"[Thread {Thread.CurrentThread.ManagedThreadId}] Обробка {imagePath}");
Thread.Sleep(500); // імітація обробки
}
string[] images =
[
"image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg",
"image5.jpg", "image6.jpg", "image7.jpg", "image8.jpg"
];
var sw = Stopwatch.StartNew();
// ✅ Паралельна обробка
Parallel.ForEach(images, imagePath =>
{
ProcessImage(imagePath);
});
sw.Stop();
Console.WriteLine($"\nВсі зображення оброблені за {sw.ElapsedMilliseconds}ms");
ParallelOptions дозволяє контролювати поведінку Parallel.For/ForEach:
using System.Threading;
using System.Threading.Tasks;
var options = new ParallelOptions
{
// Максимальна кількість паралельних потоків
MaxDegreeOfParallelism = 4,
// CancellationToken для скасування
CancellationToken = cancellationToken,
// TaskScheduler (за замовчуванням — ThreadPool)
TaskScheduler = TaskScheduler.Default
};
Parallel.For(0, 1000, options, i =>
{
ProcessItem(i);
});
MaxDegreeOfParallelism:
| Значення | Поведінка |
|---|---|
-1 (default) | Необмежена кількість потоків (ThreadPool вирішує) |
1 | Послідовне виконання (без паралелізму) |
N | Максимум N паралельних потоків |
Коли обмежувати MaxDegreeOfParallelism:
// Приклад: обмеження паралелізму для HTTP запитів
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 5 // максимум 5 одночасних запитів
};
string[] urls = GetUrls(); // 1000 URLs
Parallel.ForEach(urls, options, async url =>
{
using var client = new HttpClient();
string content = await client.GetStringAsync(url);
ProcessContent(content);
});
Parallel.For/ForEach підтримує скасування через CancellationToken:
using System.Threading;
using System.Threading.Tasks;
using var cts = new CancellationTokenSource();
var options = new ParallelOptions
{
CancellationToken = cts.Token
};
// Скасовуємо через 2 секунди
Task.Run(async () =>
{
await Task.Delay(2000);
cts.Cancel();
});
try
{
Parallel.For(0, 1000, options, i =>
{
// Не потрібно вручну перевіряти token — Parallel робить це автоматично
Thread.Sleep(100);
Console.WriteLine($"Обробка елемента {i}");
});
}
catch (OperationCanceledException)
{
Console.WriteLine("Parallel.For скасовано");
}
Parallel.For/ForEach автоматично перевіряє CancellationToken між ітераціями. Вам не потрібно вручну викликати ThrowIfCancellationRequested() (хоча можна для швидшої реакції).ParallelLoopState дозволяє контролювати виконання циклу зсередини ітерації:
using System.Threading.Tasks;
int[] numbers = Enumerable.Range(1, 100).ToArray();
// Шукаємо число 42 паралельно
Parallel.ForEach(numbers, (number, state) =>
{
Console.WriteLine($"Перевірка {number} на потоці {Thread.CurrentThread.ManagedThreadId}");
if (number == 42)
{
Console.WriteLine($"Знайдено 42! Зупиняємо пошук.");
state.Stop(); // зупинити всі ітерації якомога швидше
return;
}
Thread.Sleep(50); // імітація роботи
});
Console.WriteLine("Пошук завершено");
ParallelLoopState методи:
| Метод | Опис |
|---|---|
Stop() | Зупинити всі ітерації якомога швидше (не гарантує миттєву зупинку) |
Break() | Зупинити ітерації після поточної (гарантує що всі ітерації до поточної завершаться) |
IsStopped | Чи був викликаний Stop() |
IsExceptional | Чи була exception в іншій ітерації |
ShouldExitCurrentIteration | Чи потрібно завершити поточну ітерацію |
Stop() vs Break():
// Stop() — зупинити ВСЕ якомога швидше
Parallel.For(0, 1000, (i, state) =>
{
if (i == 500)
state.Stop(); // ітерації 501-999 можуть НЕ виконатись
ProcessItem(i);
});
// Break() — зупинити після поточної, але гарантувати виконання попередніх
Parallel.For(0, 1000, (i, state) =>
{
if (i == 500)
state.Break(); // ітерації 0-500 гарантовано виконаються
ProcessItem(i);
});
Коли потрібно агрегувати результати (наприклад, підрахувати суму), наївний підхід з lock буде повільним:
// ❌ Повільно — lock на кожній ітерації
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
long sum = 0;
object lockObj = new();
Parallel.ForEach(numbers, number =>
{
lock (lockObj) // contention на кожній ітерації!
{
sum += number;
}
});
Console.WriteLine($"Сума: {sum}");
Рішення: використовувати thread-local state — кожен потік має власний локальний акумулятор, і тільки в кінці результати об'єднуються:
using System.Threading.Tasks;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// ✅ Швидко — thread-local aggregation
long totalSum = Parallel.ForEach(
source: numbers,
// localInit — ініціалізація локального стану для кожного потоку
localInit: () => 0L,
// body — обробка елемента з локальним станом
body: (number, state, localSum) =>
{
return localSum + number; // додаємо до локального акумулятора
},
// localFinally — об'єднання локальних результатів (викликається один раз на потік)
localFinally: localSum =>
{
Interlocked.Add(ref totalSum, localSum); // atomic додавання
}
).Result; // ParallelLoopResult містить інформацію про виконання
Console.WriteLine($"Сума: {totalSum}");
Як це працює:
Benchmark:
Наївний підхід (lock на кожній ітерації): 2,847ms
Thread-local aggregation: 142ms
Прискорення: 20x
Parallel.Invoke() виконує декілька різних методів паралельно:
using System.Threading.Tasks;
using System.Diagnostics;
static void Task1()
{
Console.WriteLine($"Task1 на потоці {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000);
}
static void Task2()
{
Console.WriteLine($"Task2 на потоці {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1500);
}
static void Task3()
{
Console.WriteLine($"Task3 на потоці {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(800);
}
var sw = Stopwatch.StartNew();
// Виконати всі три методи паралельно
Parallel.Invoke(Task1, Task2, Task3);
sw.Stop();
Console.WriteLine($"Всі задачі завершені за {sw.ElapsedMilliseconds}ms");
Коли використовувати Parallel.Invoke:
Task.WhenAll()У .NET 6 додано Parallel.ForEachAsync() — паралельна обробка з підтримкою async/await:
using System.Threading.Tasks;
static async Task<string> DownloadAsync(string url)
{
using var client = new HttpClient();
return await client.GetStringAsync(url);
}
string[] urls =
[
"https://api.example.com/data1",
"https://api.example.com/data2",
"https://api.example.com/data3",
"https://api.example.com/data4"
];
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 3 // максимум 3 одночасних запити
};
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
Console.WriteLine($"Завантаження {url}...");
string content = await DownloadAsync(url);
Console.WriteLine($"Завантажено {content.Length} байт з {url}");
});
Console.WriteLine("Всі завантаження завершені");
| Метод | Для чого |
|---|---|
Parallel.ForEach | CPU-bound операції (обчислення, обробка даних) |
Parallel.ForEachAsync | I/O-bound операції (HTTP, файли, база даних) |
ForEachAsync — він не блокує потоки ThreadPool.Parallel.For/ForEach не завжди дає прискорення. Є випадки коли він повільніший за послідовне виконання:
❌ Дуже швидкі операції (overhead паралелізму більший за виграш):
// ❌ Повільніше за звичайний for
Parallel.For(0, 1000, i =>
{
int result = i * 2; // занадто швидка операція
});
❌ Мало елементів:
// ❌ Overhead створення потоків більший за виграш
int[] numbers = [1, 2, 3, 4, 5];
Parallel.ForEach(numbers, n => ProcessItem(n));
❌ I/O-bound операції (використовуйте async/await замість):
// ❌ Блокує потоки ThreadPool
Parallel.ForEach(urls, url =>
{
string content = DownloadSync(url); // блокуюче I/O
});
// ✅ Правильно
await Parallel.ForEachAsync(urls, async (url, ct) =>
{
string content = await DownloadAsync(url); // non-blocking I/O
});
❌ Операції з великим shared state та contention:
// ❌ Lock на кожній ітерації — повільніше за послідовне виконання
object lockObj = new();
Parallel.For(0, 1000, i =>
{
lock (lockObj)
{
// вся робота під lock — немає паралелізму!
DoWork(i);
}
});
Правило: використовуйте Parallel коли:
Parallel.For/ForEach — це імперативний підхід: ви явно пишете цикли та контролюєте виконання. PLINQ (Parallel LINQ) — це декларативний підхід: ви описуєте що потрібно зробити, а не як.
// Імперативний підхід (Parallel.ForEach)
var results = new ConcurrentBag<int>();
Parallel.ForEach(numbers, number =>
{
if (number % 2 == 0)
{
int square = number * number;
results.Add(square);
}
});
// Декларативний підхід (PLINQ)
var results = numbers
.AsParallel()
.Where(n => n % 2 == 0)
.Select(n => n * n)
.ToArray();
PLINQ автоматично:
Щоб увімкнути паралельну обробку LINQ запиту, додайте .AsParallel():
using System.Linq;
using System.Diagnostics;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// ❌ Звичайний LINQ — послідовне виконання
var sw1 = Stopwatch.StartNew();
var result1 = numbers
.Where(n => IsPrime(n))
.ToArray();
sw1.Stop();
Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, знайдено {result1.Length} простих чисел");
// ✅ PLINQ — паралельне виконання
var sw2 = Stopwatch.StartNew();
var result2 = numbers
.AsParallel() // увімкнути паралелізм
.Where(n => IsPrime(n))
.ToArray();
sw2.Stop();
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, знайдено {result2.Length} простих чисел");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
static bool IsPrime(int n)
{
if (n < 2) return false;
for (int i = 2; i * i <= n; i++)
if (n % i == 0) return false;
return true;
}
За замовчуванням PLINQ не гарантує порядок результатів (для максимальної продуктивності). Якщо порядок важливий — використовуйте AsOrdered():
int[] numbers = Enumerable.Range(1, 20).ToArray();
// ❌ Без AsOrdered — порядок не гарантується
var unordered = numbers
.AsParallel()
.Select(n => n * n)
.ToArray();
Console.WriteLine("Без AsOrdered: " + string.Join(", ", unordered));
// Вивід: 16, 1, 4, 25, 9, 36, 49, 64, ... (випадковий порядок)
// ✅ З AsOrdered — порядок збережено
var ordered = numbers
.AsParallel()
.AsOrdered() // зберегти порядок
.Select(n => n * n)
.ToArray();
Console.WriteLine("З AsOrdered: " + string.Join(", ", ordered));
// Вивід: 1, 4, 9, 16, 25, 36, 49, 64, ... (правильний порядок)
AsOrdered() має overhead — результати потрібно буферизувати та сортувати. Використовуйте тільки коли порядок дійсно важливий.За замовчуванням PLINQ використовує стільки потоків, скільки вважає оптимальним (зазвичай = кількість CPU cores). Можна обмежити:
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();
// Обмежити до 4 потоків
var result = numbers
.AsParallel()
.WithDegreeOfParallelism(4) // максимум 4 потоки
.Where(n => n % 2 == 0)
.Select(n => n * n)
.ToArray();
Console.WriteLine($"Оброблено {result.Length} елементів");
Коли використовувати:
PLINQ підтримує скасування через CancellationToken:
using System.Threading;
using System.Linq;
using var cts = new CancellationTokenSource();
// Скасовуємо через 2 секунди
Task.Run(async () =>
{
await Task.Delay(2000);
cts.Cancel();
});
try
{
var result = Enumerable.Range(1, 10_000_000)
.AsParallel()
.WithCancellation(cts.Token) // передаємо токен
.Where(n =>
{
Thread.Sleep(1); // імітація повільної роботи
return IsPrime(n);
})
.ToArray();
Console.WriteLine($"Знайдено {result.Length} простих чисел");
}
catch (OperationCanceledException)
{
Console.WriteLine("PLINQ запит скасовано");
}
PLINQ автоматично вирішує чи варто виконувати запит паралельно (аналізує overhead). Можна форсувати:
using System.Linq;
int[] numbers = Enumerable.Range(1, 100).ToArray();
// Форсувати паралельне виконання (навіть якщо PLINQ вважає що це неефективно)
var result = numbers
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(n => n * n)
.ToArray();
ParallelExecutionMode:
| Режим | Опис |
|---|---|
Default | PLINQ сам вирішує (рекомендовано) |
ForceParallelism | Завжди виконувати паралельно (навіть якщо неефективно) |
Default — PLINQ добре вміє визначати коли паралелізм вигідний.PLINQ буферизує результати перед поверненням. Можна контролювати стратегію буферизації:
using System.Linq;
int[] numbers = Enumerable.Range(1, 1000).ToArray();
// Повертати результати якомога швидше (мінімальна буферизація)
var result = numbers
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(n => n * n);
foreach (var item in result)
{
Console.WriteLine(item); // результати з'являються одразу
}
ParallelMergeOptions:
| Опція | Опис | Use Case |
|---|---|---|
Default | PLINQ сам вирішує | Рекомендовано для більшості випадків |
NotBuffered | Мінімальна буферизація, результати одразу | Streaming, UI updates |
AutoBuffered | Помірна буферизація | Баланс між throughput та latency |
FullyBuffered | Максимальна буферизація | Максимальний throughput |
Якщо не потрібно збирати результати (наприклад, запис у файл або базу даних), використовуйте ForAll() замість ToArray():
using System.Linq;
int[] numbers = Enumerable.Range(1, 1000).ToArray();
// ❌ Неефективно — збирає результати які не потрібні
numbers
.AsParallel()
.Select(n => n * n)
.ToArray(); // створює масив який ніхто не використовує
// ✅ Ефективно — обробляє без збору результатів
numbers
.AsParallel()
.ForAll(n =>
{
int square = n * n;
Console.WriteLine($"{n}² = {square}");
// або запис у файл, базу даних, тощо
});
Різниця:
ToArray() / ToList() — збирає всі результати в колекціюForAll() — обробляє кожен елемент без збору результатів (швидше, менше пам'яті)PLINQ має спеціальний метод Aggregate() для паралельної агрегації (сума, добуток, тощо):
using System.Linq;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// Паралельна сума
long sum = numbers
.AsParallel()
.Aggregate(
seed: 0L, // початкове значення для кожного потоку
// updateAccumulatorFunc — оновлення локального акумулятора
(localSum, number) => localSum + number,
// combineAccumulatorsFunc — об'єднання локальних акумуляторів
(sum1, sum2) => sum1 + sum2,
// resultSelector — фінальна трансформація (опціонально)
finalSum => finalSum
);
Console.WriteLine($"Сума: {sum}");
Спрощений варіант (коли seed та combineFunc однакові):
// Паралельна сума (спрощено)
long sum = numbers
.AsParallel()
.Sum(); // PLINQ автоматично паралелізує Sum()
// Паралельний добуток
long product = numbers
.AsParallel()
.Aggregate(1L, (acc, n) => acc * n);
PLINQ автоматично розбиває вхідну колекцію на chunks для паралельної обробки. Є три стратегії:
1. Range Partitioning — для індексованих колекцій (масиви, списки):
// PLINQ розбиває масив на рівні chunks
int[] numbers = new int[1000];
var result = numbers.AsParallel().Select(n => n * n).ToArray();
// Приклад розбиття на 4 потоки:
// Thread 1: [0..249]
// Thread 2: [250..499]
// Thread 3: [500..749]
// Thread 4: [750..999]
2. Chunk Partitioning — для неіндексованих колекцій (IEnumerable):
// PLINQ бере елементи по одному і розподіляє між потоками
IEnumerable<int> numbers = GetNumbers(); // не масив, а IEnumerable
var result = numbers.AsParallel().Select(n => n * n).ToArray();
// Динамічний розподіл — кожен потік бере наступний елемент коли звільняється
3. Hash Partitioning — для операцій з групуванням (GroupBy, Join):
// PLINQ використовує hash для розподілу елементів
var grouped = numbers
.AsParallel()
.GroupBy(n => n % 10) // групування по останній цифрі
.ToArray();
// Елементи з однаковим hash потрапляють на один потік
✅ DO:
ForAll() замість ToArray() якщо результати не потрібніAsOrdered() тільки коли порядок дійсно важливий❌ DON'T:
async/await)using System.Linq;
using System.Diagnostics;
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// LINQ
var sw1 = Stopwatch.StartNew();
var result1 = numbers
.Where(n => n % 2 == 0)
.Select(n => n * n)
.Sum();
sw1.Stop();
// PLINQ
var sw2 = Stopwatch.StartNew();
var result2 = numbers
.AsParallel()
.Where(n => n % 2 == 0)
.Select(n => n * n)
.Sum();
sw2.Stop();
Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, результат: {result1}");
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, результат: {result2}");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
Створіть утиліту для паралельної обробки файлів у директорії з підтримкою скасування.
Вимоги:
.txt файли у директоріїCancellationToken для скасування// Приклад використання:
using var cts = new CancellationTokenSource();
// Скасувати через 5 секунд
cts.CancelAfter(TimeSpan.FromSeconds(5));
try
{
var results = await ProcessFilesAsync("C:/Documents", cts.Token);
foreach (var (file, wordCount) in results)
{
Console.WriteLine($"{file}: {wordCount} слів");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Обробка скасована");
}
Обчисліть число π методом Монте-Карло використовуючи Parallel.For та PLINQ. Порівняйте performance.
Алгоритм:
Вимоги:
Parallel.For з thread-local stateСтворіть спрощений MapReduce framework для паралельної обробки великих текстових файлів.
Вимоги:
Task.WhenAll() для координаціїCancellationToken// Приклад використання:
var mapReduce = new MapReduceEngine();
var wordFrequency = await mapReduce.ProcessFileAsync(
filePath: "large-text.txt",
chunkSize: 1024 * 1024, // 1 MB chunks
cancellationToken: cts.Token,
progress: new Progress<double>(p => Console.WriteLine($"Прогрес: {p:P0}"))
);
// Топ 10 найчастіших слів
var top10 = wordFrequency
.OrderByDescending(kvp => kvp.Value)
.Take(10);
foreach (var (word, count) in top10)
{
Console.WriteLine($"{word}: {count}");
}
Це завершує матеріал про TPL, Parallel та PLINQ. Ви навчились працювати з Task, композицією задач, CancellationToken, exception handling, Parallel class та PLINQ для ефективної паралельної обробки даних.
TPL, Task та Композиція — Від Thread до Task
Глибокий академічний розбір Task Parallel Library — еволюція від Thread до Task, Task<T>, композиція через WhenAll/WhenAny, CancellationToken, exception handling та AggregateException. Теорія і практика сучасного паралелізму.
Async/Await — Фундамент Асинхронного Програмування
Глибокий академічний розбір async/await у C# — від проблеми блокуючого I/O до state machine під капотом. Історія асинхронності (APM, EAP, TAP), синтаксис async/await, return types, exception handling та best practices.