System Programming Windows

Parallel Class та PLINQ — Data Parallelism

Глибокий академічний розбір Parallel.For/ForEach/Invoke, ParallelOptions, thread-local state, PLINQ (Parallel LINQ) з AsParallel, partitioning strategies та performance optimization. Теорія і практика паралельної обробки даних.

Parallel Class та PLINQ — Data Parallelism

Вступ: Task vs Parallel — Різні Підходи до Паралелізму

У попередньому файлі ви навчились працювати з Task — абстракцією одиниці роботи, яка виконується асинхронно. Task ідеально підходить для task parallelism — коли у вас є декілька різних операцій, які потрібно виконати паралельно:

// Task Parallelism — різні операції
Task task1 = DownloadFileAsync("url1");
Task task2 = ProcessImageAsync("image.jpg");
Task task3 = SendEmailAsync("user@example.com");

await Task.WhenAll(task1, task2, task3);

Але є інший тип паралелізму — data parallelism (паралелізм даних): коли у вас є одна операція, яку потрібно виконати над великою кількістю елементів:

// Data Parallelism — одна операція над багатьма елементами
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();

// Обчислити квадрат кожного числа паралельно
foreach (int number in numbers)
{
    int square = number * number;
    // ...
}

Для data parallelism .NET надає два інструменти:

  1. Parallel class — імперативний підхід (Parallel.For, Parallel.ForEach)
  2. PLINQ — декларативний підхід (Parallel LINQ)

6. Parallel Class: Імперативний Data Parallelism

Parallel.For() — Паралельний Цикл з Індексом

Parallel.For() — це паралельний аналог звичайного for циклу. Він автоматично розподіляє ітерації між потоками ThreadPool.

ParallelFor.cs
using System.Threading.Tasks;
using System.Diagnostics;

static void ProcessItem(int i)
{
    // Імітація CPU-bound роботи
    double result = 0;
    for (int j = 0; j < 1_000_000; j++)
        result += Math.Sqrt(i * j);
}

int itemCount = 100;

// ❌ Послідовний for
var sw1 = Stopwatch.StartNew();
for (int i = 0; i < itemCount; i++)
{
    ProcessItem(i);
}
sw1.Stop();
Console.WriteLine($"Послідовний for: {sw1.ElapsedMilliseconds}ms");

// ✅ Паралельний Parallel.For
var sw2 = Stopwatch.StartNew();
Parallel.For(0, itemCount, i =>
{
    ProcessItem(i);
});
sw2.Stop();
Console.WriteLine($"Parallel.For: {sw2.ElapsedMilliseconds}ms");

Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
Parallel.For Benchmark
Послідовний for: 8,247ms
Parallel.For: 1,142ms
Прискорення: 7.2x
На 8-core CPU отримали ~7x прискорення!

Як це працює:

  1. Parallel.For() розбиває діапазон [0, itemCount) на chunks (шматки)
  2. Кожен chunk виконується на окремому потоці ThreadPool
  3. ThreadPool автоматично балансує навантаження між потоками
  4. Головний потік блокується поки всі ітерації не завершаться

Синтаксис:

// Базовий варіант
Parallel.For(fromInclusive: 0, toExclusive: 100, body: i =>
{
    Console.WriteLine($"Обробка елемента {i} на потоці {Thread.CurrentThread.ManagedThreadId}");
});

// З ParallelOptions
var options = new ParallelOptions
{
    MaxDegreeOfParallelism = 4  // максимум 4 потоки
};

Parallel.For(0, 100, options, i =>
{
    ProcessItem(i);
});

Parallel.ForEach() — Паралельна Обробка Колекцій

Parallel.ForEach() — це паралельний аналог foreach. Використовується коли у вас є колекція елементів (не обов'язково з індексами).

ParallelForEach.cs
using System.Threading.Tasks;
using System.Diagnostics;

static void ProcessImage(string imagePath)
{
    Console.WriteLine($"[Thread {Thread.CurrentThread.ManagedThreadId}] Обробка {imagePath}");
    Thread.Sleep(500);  // імітація обробки
}

string[] images = 
[
    "image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg",
    "image5.jpg", "image6.jpg", "image7.jpg", "image8.jpg"
];

var sw = Stopwatch.StartNew();

// ✅ Паралельна обробка
Parallel.ForEach(images, imagePath =>
{
    ProcessImage(imagePath);
});

sw.Stop();
Console.WriteLine($"\nВсі зображення оброблені за {sw.ElapsedMilliseconds}ms");
Parallel.ForEach Output
[Thread 4] Обробка image1.jpg
[Thread 5] Обробка image2.jpg
[Thread 6] Обробка image3.jpg
[Thread 7] Обробка image4.jpg
[Thread 4] Обробка image5.jpg
[Thread 5] Обробка image6.jpg
[Thread 6] Обробка image7.jpg
[Thread 7] Обробка image8.jpg
Всі зображення оброблені за 1,003ms
Замість 4,000ms послідовно!

ParallelOptions — Налаштування Паралелізму

ParallelOptions дозволяє контролювати поведінку Parallel.For/ForEach:

ParallelOptions.cs
using System.Threading;
using System.Threading.Tasks;

var options = new ParallelOptions
{
    // Максимальна кількість паралельних потоків
    MaxDegreeOfParallelism = 4,
    
    // CancellationToken для скасування
    CancellationToken = cancellationToken,
    
    // TaskScheduler (за замовчуванням — ThreadPool)
    TaskScheduler = TaskScheduler.Default
};

Parallel.For(0, 1000, options, i =>
{
    ProcessItem(i);
});

MaxDegreeOfParallelism:

ЗначенняПоведінка
-1 (default)Необмежена кількість потоків (ThreadPool вирішує)
1Послідовне виконання (без паралелізму)
NМаксимум N паралельних потоків

Коли обмежувати MaxDegreeOfParallelism:

  • ✅ Операції використовують багато пам'яті (щоб не вичерпати RAM)
  • ✅ Зовнішній ресурс має обмеження (наприклад, база даних підтримує max 10 з'єднань)
  • ✅ Тестування — щоб контролювати навантаження
MaxDegreeExample.cs
// Приклад: обмеження паралелізму для HTTP запитів
var options = new ParallelOptions
{
    MaxDegreeOfParallelism = 5  // максимум 5 одночасних запитів
};

string[] urls = GetUrls();  // 1000 URLs

Parallel.ForEach(urls, options, async url =>
{
    using var client = new HttpClient();
    string content = await client.GetStringAsync(url);
    ProcessContent(content);
});

CancellationToken у Parallel

Parallel.For/ForEach підтримує скасування через CancellationToken:

ParallelCancellation.cs
using System.Threading;
using System.Threading.Tasks;

using var cts = new CancellationTokenSource();

var options = new ParallelOptions
{
    CancellationToken = cts.Token
};

// Скасовуємо через 2 секунди
Task.Run(async () =>
{
    await Task.Delay(2000);
    cts.Cancel();
});

try
{
    Parallel.For(0, 1000, options, i =>
    {
        // Не потрібно вручну перевіряти token — Parallel робить це автоматично
        Thread.Sleep(100);
        Console.WriteLine($"Обробка елемента {i}");
    });
}
catch (OperationCanceledException)
{
    Console.WriteLine("Parallel.For скасовано");
}
Важливо: Parallel.For/ForEach автоматично перевіряє CancellationToken між ітераціями. Вам не потрібно вручну викликати ThrowIfCancellationRequested() (хоча можна для швидшої реакції).

ParallelLoopState — Контроль Виконання

ParallelLoopState дозволяє контролювати виконання циклу зсередини ітерації:

ParallelLoopState.cs
using System.Threading.Tasks;

int[] numbers = Enumerable.Range(1, 100).ToArray();

// Шукаємо число 42 паралельно
Parallel.ForEach(numbers, (number, state) =>
{
    Console.WriteLine($"Перевірка {number} на потоці {Thread.CurrentThread.ManagedThreadId}");
    
    if (number == 42)
    {
        Console.WriteLine($"Знайдено 42! Зупиняємо пошук.");
        state.Stop();  // зупинити всі ітерації якомога швидше
        return;
    }
    
    Thread.Sleep(50);  // імітація роботи
});

Console.WriteLine("Пошук завершено");

ParallelLoopState методи:

МетодОпис
Stop()Зупинити всі ітерації якомога швидше (не гарантує миттєву зупинку)
Break()Зупинити ітерації після поточної (гарантує що всі ітерації до поточної завершаться)
IsStoppedЧи був викликаний Stop()
IsExceptionalЧи була exception в іншій ітерації
ShouldExitCurrentIterationЧи потрібно завершити поточну ітерацію

Stop() vs Break():

// Stop() — зупинити ВСЕ якомога швидше
Parallel.For(0, 1000, (i, state) =>
{
    if (i == 500)
        state.Stop();  // ітерації 501-999 можуть НЕ виконатись
    
    ProcessItem(i);
});

// Break() — зупинити після поточної, але гарантувати виконання попередніх
Parallel.For(0, 1000, (i, state) =>
{
    if (i == 500)
        state.Break();  // ітерації 0-500 гарантовано виконаються
    
    ProcessItem(i);
});

Thread-Local State — Оптимізація для Агрегації

Коли потрібно агрегувати результати (наприклад, підрахувати суму), наївний підхід з lock буде повільним:

ParallelAggregationBad.cs
// ❌ Повільно — lock на кожній ітерації
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
long sum = 0;
object lockObj = new();

Parallel.ForEach(numbers, number =>
{
    lock (lockObj)  // contention на кожній ітерації!
    {
        sum += number;
    }
});

Console.WriteLine($"Сума: {sum}");

Рішення: використовувати thread-local state — кожен потік має власний локальний акумулятор, і тільки в кінці результати об'єднуються:

ParallelAggregationGood.cs
using System.Threading.Tasks;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// ✅ Швидко — thread-local aggregation
long totalSum = Parallel.ForEach(
    source: numbers,
    
    // localInit — ініціалізація локального стану для кожного потоку
    localInit: () => 0L,
    
    // body — обробка елемента з локальним станом
    body: (number, state, localSum) =>
    {
        return localSum + number;  // додаємо до локального акумулятора
    },
    
    // localFinally — об'єднання локальних результатів (викликається один раз на потік)
    localFinally: localSum =>
    {
        Interlocked.Add(ref totalSum, localSum);  // atomic додавання
    }
).Result;  // ParallelLoopResult містить інформацію про виконання

Console.WriteLine($"Сума: {totalSum}");

Як це працює:

  1. localInit — викликається один раз для кожного потоку, створює локальний акумулятор
  2. body — викликається для кожного елемента, оновлює локальний акумулятор (без lock!)
  3. localFinally — викликається один раз для кожного потоку, об'єднує локальні результати в глобальний

Benchmark:

Наївний підхід (lock на кожній ітерації): 2,847ms
Thread-local aggregation: 142ms
Прискорення: 20x

Parallel.Invoke() — Паралельне Виконання Методів

Parallel.Invoke() виконує декілька різних методів паралельно:

ParallelInvoke.cs
using System.Threading.Tasks;
using System.Diagnostics;

static void Task1()
{
    Console.WriteLine($"Task1 на потоці {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(1000);
}

static void Task2()
{
    Console.WriteLine($"Task2 на потоці {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(1500);
}

static void Task3()
{
    Console.WriteLine($"Task3 на потоці {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(800);
}

var sw = Stopwatch.StartNew();

// Виконати всі три методи паралельно
Parallel.Invoke(Task1, Task2, Task3);

sw.Stop();
Console.WriteLine($"Всі задачі завершені за {sw.ElapsedMilliseconds}ms");
Parallel.Invoke Output
Task1 на потоці 4
Task2 на потоці 5
Task3 на потоці 6
Всі задачі завершені за 1,502ms
Замість 3,300ms послідовно!

Коли використовувати Parallel.Invoke:

  • ✅ Декілька незалежних CPU-bound операцій
  • ✅ Кількість операцій відома заздалегідь
  • ❌ Для I/O-bound операцій краще Task.WhenAll()

Parallel.ForEachAsync() — Async Підтримка (.NET 6+)

У .NET 6 додано Parallel.ForEachAsync() — паралельна обробка з підтримкою async/await:

ParallelForEachAsync.cs
using System.Threading.Tasks;

static async Task<string> DownloadAsync(string url)
{
    using var client = new HttpClient();
    return await client.GetStringAsync(url);
}

string[] urls = 
[
    "https://api.example.com/data1",
    "https://api.example.com/data2",
    "https://api.example.com/data3",
    "https://api.example.com/data4"
];

var options = new ParallelOptions
{
    MaxDegreeOfParallelism = 3  // максимум 3 одночасних запити
};

await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
    Console.WriteLine($"Завантаження {url}...");
    string content = await DownloadAsync(url);
    Console.WriteLine($"Завантажено {content.Length} байт з {url}");
});

Console.WriteLine("Всі завантаження завершені");
Parallel.ForEach vs Parallel.ForEachAsync:
МетодДля чого
Parallel.ForEachCPU-bound операції (обчислення, обробка даних)
Parallel.ForEachAsyncI/O-bound операції (HTTP, файли, база даних)
Для I/O-bound завжди використовуйте ForEachAsync — він не блокує потоки ThreadPool.

Коли НЕ Використовувати Parallel

Parallel.For/ForEach не завжди дає прискорення. Є випадки коли він повільніший за послідовне виконання:

❌ Дуже швидкі операції (overhead паралелізму більший за виграш):

// ❌ Повільніше за звичайний for
Parallel.For(0, 1000, i =>
{
    int result = i * 2;  // занадто швидка операція
});

❌ Мало елементів:

// ❌ Overhead створення потоків більший за виграш
int[] numbers = [1, 2, 3, 4, 5];
Parallel.ForEach(numbers, n => ProcessItem(n));

❌ I/O-bound операції (використовуйте async/await замість):

// ❌ Блокує потоки ThreadPool
Parallel.ForEach(urls, url =>
{
    string content = DownloadSync(url);  // блокуюче I/O
});

// ✅ Правильно
await Parallel.ForEachAsync(urls, async (url, ct) =>
{
    string content = await DownloadAsync(url);  // non-blocking I/O
});

❌ Операції з великим shared state та contention:

// ❌ Lock на кожній ітерації — повільніше за послідовне виконання
object lockObj = new();
Parallel.For(0, 1000, i =>
{
    lock (lockObj)
    {
        // вся робота під lock — немає паралелізму!
        DoWork(i);
    }
});

Правило: використовуйте Parallel коли:

  • ✅ CPU-bound операції (обчислення, обробка даних)
  • ✅ Кожна ітерація займає хоча б ~1ms (інакше overhead паралелізму занадто великий)
  • ✅ Багато елементів (хоча б 100+)
  • ✅ Мінімальний shared state (або thread-local aggregation)

7. PLINQ: Parallel LINQ

Вступ: Декларативний Data Parallelism

Parallel.For/ForEach — це імперативний підхід: ви явно пишете цикли та контролюєте виконання. PLINQ (Parallel LINQ) — це декларативний підхід: ви описуєте що потрібно зробити, а не як.

// Імперативний підхід (Parallel.ForEach)
var results = new ConcurrentBag<int>();
Parallel.ForEach(numbers, number =>
{
    if (number % 2 == 0)
    {
        int square = number * number;
        results.Add(square);
    }
});

// Декларативний підхід (PLINQ)
var results = numbers
    .AsParallel()
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .ToArray();

PLINQ автоматично:

  • ✅ Розподіляє роботу між потоками
  • ✅ Балансує навантаження
  • ✅ Об'єднує результати
  • ✅ Обробляє exceptions

AsParallel() — Увімкнення PLINQ

Щоб увімкнути паралельну обробку LINQ запиту, додайте .AsParallel():

AsParallel.cs
using System.Linq;
using System.Diagnostics;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// ❌ Звичайний LINQ — послідовне виконання
var sw1 = Stopwatch.StartNew();
var result1 = numbers
    .Where(n => IsPrime(n))
    .ToArray();
sw1.Stop();
Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, знайдено {result1.Length} простих чисел");

// ✅ PLINQ — паралельне виконання
var sw2 = Stopwatch.StartNew();
var result2 = numbers
    .AsParallel()  // увімкнути паралелізм
    .Where(n => IsPrime(n))
    .ToArray();
sw2.Stop();
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, знайдено {result2.Length} простих чисел");

Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");

static bool IsPrime(int n)
{
    if (n < 2) return false;
    for (int i = 2; i * i <= n; i++)
        if (n % i == 0) return false;
    return true;
}
PLINQ Benchmark
LINQ: 12,847ms, знайдено 664579 простих чисел
PLINQ: 1,823ms, знайдено 664579 простих чисел
Прискорення: 7.0x

AsOrdered() — Збереження Порядку

За замовчуванням PLINQ не гарантує порядок результатів (для максимальної продуктивності). Якщо порядок важливий — використовуйте AsOrdered():

AsOrdered.cs
int[] numbers = Enumerable.Range(1, 20).ToArray();

// ❌ Без AsOrdered — порядок не гарантується
var unordered = numbers
    .AsParallel()
    .Select(n => n * n)
    .ToArray();

Console.WriteLine("Без AsOrdered: " + string.Join(", ", unordered));
// Вивід: 16, 1, 4, 25, 9, 36, 49, 64, ... (випадковий порядок)

// ✅ З AsOrdered — порядок збережено
var ordered = numbers
    .AsParallel()
    .AsOrdered()  // зберегти порядок
    .Select(n => n * n)
    .ToArray();

Console.WriteLine("З AsOrdered: " + string.Join(", ", ordered));
// Вивід: 1, 4, 9, 16, 25, 36, 49, 64, ... (правильний порядок)
Performance Impact: AsOrdered() має overhead — результати потрібно буферизувати та сортувати. Використовуйте тільки коли порядок дійсно важливий.

WithDegreeOfParallelism() — Контроль Кількості Потоків

За замовчуванням PLINQ використовує стільки потоків, скільки вважає оптимальним (зазвичай = кількість CPU cores). Можна обмежити:

WithDegreeOfParallelism.cs
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();

// Обмежити до 4 потоків
var result = numbers
    .AsParallel()
    .WithDegreeOfParallelism(4)  // максимум 4 потоки
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .ToArray();

Console.WriteLine($"Оброблено {result.Length} елементів");

Коли використовувати:

  • ✅ Обмеження ресурсів (пам'ять, зовнішні API)
  • ✅ Тестування та benchmarking
  • ❌ Не встановлюйте занадто низьке значення — втратите паралелізм

WithCancellation() — Скасування PLINQ

PLINQ підтримує скасування через CancellationToken:

WithCancellation.cs
using System.Threading;
using System.Linq;

using var cts = new CancellationTokenSource();

// Скасовуємо через 2 секунди
Task.Run(async () =>
{
    await Task.Delay(2000);
    cts.Cancel();
});

try
{
    var result = Enumerable.Range(1, 10_000_000)
        .AsParallel()
        .WithCancellation(cts.Token)  // передаємо токен
        .Where(n =>
        {
            Thread.Sleep(1);  // імітація повільної роботи
            return IsPrime(n);
        })
        .ToArray();
    
    Console.WriteLine($"Знайдено {result.Length} простих чисел");
}
catch (OperationCanceledException)
{
    Console.WriteLine("PLINQ запит скасовано");
}

WithExecutionMode() — Контроль Паралелізму

PLINQ автоматично вирішує чи варто виконувати запит паралельно (аналізує overhead). Можна форсувати:

WithExecutionMode.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 100).ToArray();

// Форсувати паралельне виконання (навіть якщо PLINQ вважає що це неефективно)
var result = numbers
    .AsParallel()
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Select(n => n * n)
    .ToArray();

ParallelExecutionMode:

РежимОпис
DefaultPLINQ сам вирішує (рекомендовано)
ForceParallelismЗавжди виконувати паралельно (навіть якщо неефективно)
У 99% випадків використовуйте Default — PLINQ добре вміє визначати коли паралелізм вигідний.

WithMergeOptions() — Контроль Буферизації

PLINQ буферизує результати перед поверненням. Можна контролювати стратегію буферизації:

WithMergeOptions.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 1000).ToArray();

// Повертати результати якомога швидше (мінімальна буферизація)
var result = numbers
    .AsParallel()
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(n => n * n);

foreach (var item in result)
{
    Console.WriteLine(item);  // результати з'являються одразу
}

ParallelMergeOptions:

ОпціяОписUse Case
DefaultPLINQ сам вирішуєРекомендовано для більшості випадків
NotBufferedМінімальна буферизація, результати одразуStreaming, UI updates
AutoBufferedПомірна буферизаціяБаланс між throughput та latency
FullyBufferedМаксимальна буферизаціяМаксимальний throughput

ForAll() — Паралельна Обробка без Збору Результатів

Якщо не потрібно збирати результати (наприклад, запис у файл або базу даних), використовуйте ForAll() замість ToArray():

ForAll.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 1000).ToArray();

// ❌ Неефективно — збирає результати які не потрібні
numbers
    .AsParallel()
    .Select(n => n * n)
    .ToArray();  // створює масив який ніхто не використовує

// ✅ Ефективно — обробляє без збору результатів
numbers
    .AsParallel()
    .ForAll(n =>
    {
        int square = n * n;
        Console.WriteLine($"{n}² = {square}");
        // або запис у файл, базу даних, тощо
    });

Різниця:

  • ToArray() / ToList() — збирає всі результати в колекцію
  • ForAll() — обробляє кожен елемент без збору результатів (швидше, менше пам'яті)

Aggregate() — Паралельна Агрегація

PLINQ має спеціальний метод Aggregate() для паралельної агрегації (сума, добуток, тощо):

Aggregate.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// Паралельна сума
long sum = numbers
    .AsParallel()
    .Aggregate(
        seed: 0L,  // початкове значення для кожного потоку
        
        // updateAccumulatorFunc — оновлення локального акумулятора
        (localSum, number) => localSum + number,
        
        // combineAccumulatorsFunc — об'єднання локальних акумуляторів
        (sum1, sum2) => sum1 + sum2,
        
        // resultSelector — фінальна трансформація (опціонально)
        finalSum => finalSum
    );

Console.WriteLine($"Сума: {sum}");

Спрощений варіант (коли seed та combineFunc однакові):

// Паралельна сума (спрощено)
long sum = numbers
    .AsParallel()
    .Sum();  // PLINQ автоматично паралелізує Sum()

// Паралельний добуток
long product = numbers
    .AsParallel()
    .Aggregate(1L, (acc, n) => acc * n);

Partitioning Strategies — Стратегії Розбиття

PLINQ автоматично розбиває вхідну колекцію на chunks для паралельної обробки. Є три стратегії:

1. Range Partitioning — для індексованих колекцій (масиви, списки):

// PLINQ розбиває масив на рівні chunks
int[] numbers = new int[1000];
var result = numbers.AsParallel().Select(n => n * n).ToArray();

// Приклад розбиття на 4 потоки:
// Thread 1: [0..249]
// Thread 2: [250..499]
// Thread 3: [500..749]
// Thread 4: [750..999]

2. Chunk Partitioning — для неіндексованих колекцій (IEnumerable):

// PLINQ бере елементи по одному і розподіляє між потоками
IEnumerable<int> numbers = GetNumbers();  // не масив, а IEnumerable
var result = numbers.AsParallel().Select(n => n * n).ToArray();

// Динамічний розподіл — кожен потік бере наступний елемент коли звільняється

3. Hash Partitioning — для операцій з групуванням (GroupBy, Join):

// PLINQ використовує hash для розподілу елементів
var grouped = numbers
    .AsParallel()
    .GroupBy(n => n % 10)  // групування по останній цифрі
    .ToArray();

// Елементи з однаковим hash потрапляють на один потік

PLINQ Performance Tips

✅ DO:

  • Використовуйте PLINQ для CPU-bound операцій з великими колекціями (10,000+ елементів)
  • Використовуйте ForAll() замість ToArray() якщо результати не потрібні
  • Використовуйте AsOrdered() тільки коли порядок дійсно важливий
  • Тестуйте performance — не всі запити стають швидшими з PLINQ

❌ DON'T:

  • Не використовуйте PLINQ для швидких операцій (overhead > виграш)
  • Не використовуйте PLINQ для малих колекцій (<1,000 елементів)
  • Не використовуйте PLINQ для I/O-bound операцій (використовуйте async/await)
  • Не використовуйте PLINQ якщо операції мають великий shared state з contention

Benchmark: LINQ vs PLINQ

LinqVsPlinqBenchmark.cs
using System.Linq;
using System.Diagnostics;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// LINQ
var sw1 = Stopwatch.StartNew();
var result1 = numbers
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .Sum();
sw1.Stop();

// PLINQ
var sw2 = Stopwatch.StartNew();
var result2 = numbers
    .AsParallel()
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .Sum();
sw2.Stop();

Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, результат: {result1}");
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, результат: {result2}");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
LINQ vs PLINQ Benchmark
LINQ: 847ms, результат: 83333333500000
PLINQ: 142ms, результат: 83333333500000
Прискорення: 6.0x

8. Практичні Завдання

Рівень 1: Parallel File Processor з CancellationToken

Створіть утиліту для паралельної обробки файлів у директорії з підтримкою скасування.

Вимоги:

  • Знайти всі .txt файли у директорії
  • Паралельно прочитати кожен файл та підрахувати кількість слів
  • Підтримка CancellationToken для скасування
  • Вивести прогрес обробки
// Приклад використання:
using var cts = new CancellationTokenSource();

// Скасувати через 5 секунд
cts.CancelAfter(TimeSpan.FromSeconds(5));

try
{
    var results = await ProcessFilesAsync("C:/Documents", cts.Token);
    
    foreach (var (file, wordCount) in results)
    {
        Console.WriteLine($"{file}: {wordCount} слів");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("Обробка скасована");
}

Рівень 2: Monte Carlo π Calculation з Parallel та PLINQ

Обчисліть число π методом Монте-Карло використовуючи Parallel.For та PLINQ. Порівняйте performance.

Алгоритм:

  1. Генеруємо N випадкових точок у квадраті 0,1 × 0,1
  2. Підраховуємо скільки точок потрапили у чверть кола радіусом 1
  3. π ≈ 4 × (кількість точок у колі) / (загальна кількість точок)

Вимоги:

  • Реалізувати через Parallel.For з thread-local state
  • Реалізувати через PLINQ
  • Benchmark обох підходів
  • N = 100,000,000 точок

Рівень 3: MapReduce Framework з Task.WhenAll

Створіть спрощений MapReduce framework для паралельної обробки великих текстових файлів.

Вимоги:

  • Map phase: розбити файл на chunks, паралельно обробити кожен chunk (підрахувати частоту слів)
  • Reduce phase: об'єднати результати з усіх chunks
  • Використовувати Task.WhenAll() для координації
  • Підтримка CancellationToken
  • Progress reporting
// Приклад використання:
var mapReduce = new MapReduceEngine();

var wordFrequency = await mapReduce.ProcessFileAsync(
    filePath: "large-text.txt",
    chunkSize: 1024 * 1024,  // 1 MB chunks
    cancellationToken: cts.Token,
    progress: new Progress<double>(p => Console.WriteLine($"Прогрес: {p:P0}"))
);

// Топ 10 найчастіших слів
var top10 = wordFrequency
    .OrderByDescending(kvp => kvp.Value)
    .Take(10);

foreach (var (word, count) in top10)
{
    Console.WriteLine($"{word}: {count}");
}

Це завершує матеріал про TPL, Parallel та PLINQ. Ви навчились працювати з Task, композицією задач, CancellationToken, exception handling, Parallel class та PLINQ для ефективної паралельної обробки даних.