System Programming Windows

Parallel Class та PLINQ — Data Parallelism

Глибокий академічний розбір Parallel.For/ForEach/Invoke, ParallelOptions, thread-local state, PLINQ (Parallel LINQ) з AsParallel, partitioning strategies та performance optimization. Теорія і практика паралельної обробки даних.

Parallel Class та PLINQ — Data Parallelism

Вступ: Task vs Parallel — Різні Підходи до Паралелізму

У попередньому файлі ви навчились працювати з Task — абстракцією одиниці роботи, яка виконується асинхронно. Task ідеально підходить для task parallelism — коли у вас є декілька різних операцій, які потрібно виконати паралельно:

// Task Parallelism — різні операції
Task task1 = Task.Run(() => DownloadFile("url1"));
Task task2 = Task.Run(() => ProcessImage("image.jpg"));
Task task3 = Task.Run(() => SendEmail("user@example.com"));

Task.WaitAll(task1, task2, task3);

Але є інший тип паралелізму — data parallelism (паралелізм даних): коли у вас є одна операція, яку потрібно виконати над великою кількістю елементів:

// Data Parallelism — одна операція над багатьма елементами
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();

// Обчислити квадрат кожного числа паралельно
foreach (int number in numbers)
{
    int square = number * number;
    // ...
}

Для data parallelism .NET надає два інструменти:

  1. Parallel class — імперативний підхід (Parallel.For, Parallel.ForEach)
  2. PLINQ — декларативний підхід (Parallel LINQ)
Loading diagram...
@startuml
skinparam style plain
skinparam backgroundColor transparent
skinparam defaultTextAlignment center

package "Task Parallelism" {
  [Task 1: Download] as T1
  [Task 2: Process] as T2
  [Task 3: Send Email] as T3
}

package "Data Parallelism" {
  [Data Collection] as DC
  [Parallel.For/ForEach] as PF
  [PLINQ] as PL
  
  DC --> PF : Imperative
  DC --> PL : Declarative
}

note right of T1
  Різні операції
  виконуються паралельно
end note

note right of DC
  Одна операція
  над багатьма елементами
end note

@enduml

6. Parallel Class: Імперативний Data Parallelism

Parallel.For() — Паралельний Цикл з Індексом

Parallel.For() — це паралельний аналог звичайного for циклу. Він автоматично розподіляє ітерації між потоками ThreadPool.

ParallelFor.cs
using System.Threading.Tasks;
using System.Diagnostics;

static void ProcessItem(int i)
{
    // Імітація CPU-bound роботи
    double result = 0;
    for (int j = 0; j < 1_000_000; j++)
        result += Math.Sqrt(i * j);
}

int itemCount = 100;

// ❌ Послідовний for
var sw1 = Stopwatch.StartNew();
for (int i = 0; i < itemCount; i++)
{
    ProcessItem(i);
}
sw1.Stop();
Console.WriteLine($"Послідовний for: {sw1.ElapsedMilliseconds}ms");

// ✅ Паралельний Parallel.For
var sw2 = Stopwatch.StartNew();
Parallel.For(0, itemCount, i =>
{
    ProcessItem(i);
});
sw2.Stop();
Console.WriteLine($"Parallel.For: {sw2.ElapsedMilliseconds}ms");

Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
Parallel.For Benchmark
Послідовний for: 8,247ms
Parallel.For: 1,142ms
Прискорення: 7.2x
На 8-core CPU отримали ~7x прискорення!

Як це працює:

  1. Parallel.For() розбиває діапазон [0, itemCount) на chunks (шматки)
  2. Кожен chunk виконується на окремому потоці ThreadPool
  3. ThreadPool автоматично балансує навантаження між потоками
  4. Головний потік блокується поки всі ітерації не завершаться
Loading diagram...
@startuml
skinparam style plain
skinparam backgroundColor transparent

participant "Main Thread" as MT
participant "ThreadPool" as TP
collections "Worker Threads" as WT

MT -> TP: Parallel.For(0, 100)
activate TP

TP -> WT: Chunk 1: [0..24]
TP -> WT: Chunk 2: [25..49]
TP -> WT: Chunk 3: [50..74]
TP -> WT: Chunk 4: [75..99]

activate WT
WT --> WT: Process items
deactivate WT

WT --> TP: All chunks completed
TP --> MT: Return control
deactivate TP

@enduml

Синтаксис:

// Базовий варіант
Parallel.For(fromInclusive: 0, toExclusive: 100, body: i =>
{
    Console.WriteLine($"Обробка елемента {i} на потоці {Thread.CurrentThread.ManagedThreadId}");
});

// З ParallelOptions
var options = new ParallelOptions
{
    MaxDegreeOfParallelism = 4  // максимум 4 потоки
};

Parallel.For(0, 100, options, i =>
{
    ProcessItem(i);
});

Parallel.ForEach() — Паралельна Обробка Колекцій

Parallel.ForEach() — це паралельний аналог foreach. Використовується коли у вас є колекція елементів (не обов'язково з індексами).

ParallelForEach.cs
using System.Threading.Tasks;
using System.Diagnostics;

static void ProcessImage(string imagePath)
{
    Console.WriteLine($"[Thread {Thread.CurrentThread.ManagedThreadId}] Обробка {imagePath}");
    Thread.Sleep(500);  // імітація обробки
}

string[] images = 
[
    "image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg",
    "image5.jpg", "image6.jpg", "image7.jpg", "image8.jpg"
];

var sw = Stopwatch.StartNew();

// ✅ Паралельна обробка
Parallel.ForEach(images, imagePath =>
{
    ProcessImage(imagePath);
});

sw.Stop();
Console.WriteLine($"\nВсі зображення оброблені за {sw.ElapsedMilliseconds}ms");
Parallel.ForEach Output
[Thread 4] Обробка image1.jpg
[Thread 5] Обробка image2.jpg
[Thread 6] Обробка image3.jpg
[Thread 7] Обробка image4.jpg
[Thread 4] Обробка image5.jpg
[Thread 5] Обробка image6.jpg
[Thread 6] Обробка image7.jpg
[Thread 7] Обробка image8.jpg
Всі зображення оброблені за 1,003ms
Замість 4,000ms послідовно!

ParallelOptions — Налаштування Паралелізму

ParallelOptions дозволяє контролювати поведінку Parallel.For/ForEach:

ParallelOptions.cs
using System.Threading;
using System.Threading.Tasks;

var options = new ParallelOptions
{
    // Максимальна кількість паралельних потоків
    MaxDegreeOfParallelism = 4,
    
    // CancellationToken для скасування
    CancellationToken = cancellationToken,
    
    // TaskScheduler (за замовчуванням — ThreadPool)
    TaskScheduler = TaskScheduler.Default
};

Parallel.For(0, 1000, options, i =>
{
    ProcessItem(i);
});

MaxDegreeOfParallelism:

ЗначенняПоведінка
-1 (default)Необмежена кількість потоків (ThreadPool вирішує)
1Послідовне виконання (без паралелізму)
NМаксимум N паралельних потоків

Коли обмежувати MaxDegreeOfParallelism:

  • ✅ Операції використовують багато пам'яті (щоб не вичерпати RAM)
  • ✅ Зовнішній ресурс має обмеження (наприклад, база даних підтримує max 10 з'єднань)
  • ✅ Тестування — щоб контролювати навантаження
MaxDegreeExample.cs
// Приклад: обмеження паралелізму для HTTP запитів
var options = new ParallelOptions
{
    MaxDegreeOfParallelism = 5  // максимум 5 одночасних запитів
};

string[] urls = GetUrls();  // 1000 URLs

Parallel.ForEach(urls, options, url =>
{
    using var client = new HttpClient();
    string content = client.GetStringAsync(url).Result;
    ProcessContent(content);
});

CancellationToken у Parallel

Parallel.For/ForEach підтримує скасування через CancellationToken:

ParallelCancellation.cs
using System.Threading;
using System.Threading.Tasks;

using var cts = new CancellationTokenSource();

var options = new ParallelOptions
{
    CancellationToken = cts.Token
};

// Скасовуємо через 2 секунди
Task.Run(() =>
{
    Thread.Sleep(2000);
    cts.Cancel();
});

try
{
    Parallel.For(0, 1000, options, i =>
    {
        // Не потрібно вручну перевіряти token — Parallel робить це автоматично
        Thread.Sleep(100);
        Console.WriteLine($"Обробка елемента {i}");
    });
}
catch (OperationCanceledException)
{
    Console.WriteLine("Parallel.For скасовано");
}
Важливо: Parallel.For/ForEach автоматично перевіряє CancellationToken між ітераціями. Вам не потрібно вручну викликати ThrowIfCancellationRequested() (хоча можна для швидшої реакції).

ParallelLoopState — Контроль Виконання

ParallelLoopState дозволяє контролювати виконання циклу зсередини ітерації:

ParallelLoopState.cs
using System.Threading.Tasks;

int[] numbers = Enumerable.Range(1, 100).ToArray();

// Шукаємо число 42 паралельно
Parallel.ForEach(numbers, (number, state) =>
{
    Console.WriteLine($"Перевірка {number} на потоці {Thread.CurrentThread.ManagedThreadId}");
    
    if (number == 42)
    {
        Console.WriteLine($"Знайдено 42! Зупиняємо пошук.");
        state.Stop();  // зупинити всі ітерації якомога швидше
        return;
    }
    
    Thread.Sleep(50);  // імітація роботи
});

Console.WriteLine("Пошук завершено");

ParallelLoopState методи:

МетодОпис
Stop()Зупинити всі ітерації якомога швидше (не гарантує миттєву зупинку)
Break()Зупинити ітерації після поточної (гарантує що всі ітерації до поточної завершаться)
IsStoppedЧи був викликаний Stop()
IsExceptionalЧи була exception в іншій ітерації
ShouldExitCurrentIterationЧи потрібно завершити поточну ітерацію

Stop() vs Break():

// Stop() — зупинити ВСЕ якомога швидше
Parallel.For(0, 1000, (i, state) =>
{
    if (i == 500)
        state.Stop();  // ітерації 501-999 можуть НЕ виконатись
    
    ProcessItem(i);
});

// Break() — зупинити після поточної, але гарантувати виконання попередніх
Parallel.For(0, 1000, (i, state) =>
{
    if (i == 500)
        state.Break();  // ітерації 0-500 гарантовано виконаються
    
    ProcessItem(i);
});

Thread-Local State — Оптимізація для Агрегації

Коли потрібно агрегувати результати (наприклад, підрахувати суму), наївний підхід з lock буде повільним:

ParallelAggregationBad.cs
// ❌ Повільно — lock на кожній ітерації
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
long sum = 0;
object lockObj = new();

Parallel.ForEach(numbers, number =>
{
    lock (lockObj)  // contention на кожній ітерації!
    {
        sum += number;
    }
});

Console.WriteLine($"Сума: {sum}");

Рішення: використовувати thread-local state — кожен потік має власний локальний акумулятор, і тільки в кінці результати об'єднуються:

ParallelAggregationGood.cs
using System.Threading.Tasks;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// ✅ Швидко — thread-local aggregation
long totalSum = Parallel.ForEach(
    source: numbers,
    
    // localInit — ініціалізація локального стану для кожного потоку
    localInit: () => 0L,
    
    // body — обробка елемента з локальним станом
    body: (number, state, localSum) =>
    {
        return localSum + number;  // додаємо до локального акумулятора
    },
    
    // localFinally — об'єднання локальних результатів (викликається один раз на потік)
    localFinally: localSum =>
    {
        Interlocked.Add(ref totalSum, localSum);  // atomic додавання
    }
).Result;  // ParallelLoopResult містить інформацію про виконання

Console.WriteLine($"Сума: {totalSum}");

Як це працює:

  1. localInit — викликається один раз для кожного потоку, створює локальний акумулятор
  2. body — викликається для кожного елемента, оновлює локальний акумулятор (без lock!)
  3. localFinally — викликається один раз для кожного потоку, об'єднує локальні результати в глобальний
Loading diagram...
@startuml
skinparam style plain
skinparam backgroundColor transparent

rectangle "Thread 1" {
  (localInit) as LI1
  (body: sum += 1..2500000) as B1
  (localFinally) as LF1
  
  LI1 --> B1
  B1 --> LF1
}

rectangle "Thread 2" {
  (localInit) as LI2
  (body: sum += 2500001..5000000) as B2
  (localFinally) as LF2
  
  LI2 --> B2
  B2 --> LF2
}

rectangle "Thread 3" {
  (localInit) as LI3
  (body: sum += 5000001..7500000) as B3
  (localFinally) as LF3
  
  LI3 --> B3
  B3 --> LF3
}

rectangle "Thread 4" {
  (localInit) as LI4
  (body: sum += 7500001..10000000) as B4
  (localFinally) as LF4
  
  LI4 --> B4
  B4 --> LF4
}

[Global Sum] as GS

LF1 --> GS : Interlocked.Add
LF2 --> GS : Interlocked.Add
LF3 --> GS : Interlocked.Add
LF4 --> GS : Interlocked.Add

@enduml

Benchmark:

Наївний підхід (lock на кожній ітерації): 2,847ms
Thread-local aggregation: 142ms
Прискорення: 20x

Parallel.Invoke() — Паралельне Виконання Методів

Parallel.Invoke() виконує декілька різних методів паралельно:

ParallelInvoke.cs
using System.Threading.Tasks;
using System.Diagnostics;

static void Task1()
{
    Console.WriteLine($"Task1 на потоці {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(1000);
}

static void Task2()
{
    Console.WriteLine($"Task2 на потоці {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(1500);
}

static void Task3()
{
    Console.WriteLine($"Task3 на потоці {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(800);
}

var sw = Stopwatch.StartNew();

// Виконати всі три методи паралельно
Parallel.Invoke(Task1, Task2, Task3);

sw.Stop();
Console.WriteLine($"Всі задачі завершені за {sw.ElapsedMilliseconds}ms");
Parallel.Invoke Output
Task1 на потоці 4
Task2 на потоці 5
Task3 на потоці 6
Всі задачі завершені за 1,502ms
Замість 3,300ms послідовно!

Коли використовувати Parallel.Invoke:

  • ✅ Декілька незалежних CPU-bound операцій
  • ✅ Кількість операцій відома заздалегідь
  • ❌ Для I/O-bound операцій краще Task.WhenAll()

Коли НЕ Використовувати Parallel

Parallel.For/ForEach не завжди дає прискорення. Є випадки коли він повільніший за послідовне виконання:

❌ Дуже швидкі операції (overhead паралелізму більший за виграш):

// ❌ Повільніше за звичайний for
Parallel.For(0, 1000, i =>
{
    int result = i * 2;  // занадто швидка операція
});

❌ Мало елементів:

// ❌ Overhead створення потоків більший за виграш
int[] numbers = [1, 2, 3, 4, 5];
Parallel.ForEach(numbers, n => ProcessItem(n));

❌ I/O-bound операції (використовуйте Task-based підхід замість):

// ❌ Блокує потоки ThreadPool
Parallel.ForEach(urls, url =>
{
    string content = DownloadSync(url);  // блокуюче I/O
});

// ✅ Правильно — використовуйте Task.WhenAll
var tasks = urls.Select(url => DownloadAsync(url));
var results = Task.WhenAll(tasks).Result;

❌ Операції з великим shared state та contention:

// ❌ Lock на кожній ітерації — повільніше за послідовне виконання
object lockObj = new();
Parallel.For(0, 1000, i =>
{
    lock (lockObj)
    {
        // вся робота під lock — немає паралелізму!
        DoWork(i);
    }
});

Правило: використовуйте Parallel коли:

  • ✅ CPU-bound операції (обчислення, обробка даних)
  • ✅ Кожна ітерація займає хоча б ~1ms (інакше overhead паралелізму занадто великий)
  • ✅ Багато елементів (хоча б 100+)
  • ✅ Мінімальний shared state (або thread-local aggregation)

7. PLINQ: Parallel LINQ

Вступ: Декларативний Data Parallelism

Parallel.For/ForEach — це імперативний підхід: ви явно пишете цикли та контролюєте виконання. PLINQ (Parallel LINQ) — це декларативний підхід: ви описуєте що потрібно зробити, а не як.

// Імперативний підхід (Parallel.ForEach)
var results = new ConcurrentBag<int>();
Parallel.ForEach(numbers, number =>
{
    if (number % 2 == 0)
    {
        int square = number * number;
        results.Add(square);
    }
});

// Декларативний підхід (PLINQ)
var results = numbers
    .AsParallel()
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .ToArray();

PLINQ автоматично:

  • ✅ Розподіляє роботу між потоками
  • ✅ Балансує навантаження
  • ✅ Об'єднує результати
  • ✅ Обробляє exceptions
Loading diagram...
@startuml
skinparam style plain
skinparam backgroundColor transparent

rectangle "LINQ (Sequential)" {
  [Source] --> [Where] : Sequential
  [Where] --> [Select] : Sequential
  [Select] --> [ToArray] : Sequential
}

rectangle "PLINQ (Parallel)" {
  [Source2] as S2
  [AsParallel()] as AP
  
  rectangle "Thread Pool" {
    [Where (T1)] as W1
    [Where (T2)] as W2
    [Where (T3)] as W3
    [Where (T4)] as W4
  }
  
  rectangle "Merge Results" {
    [Select & Combine] as SC
  }
  
  S2 --> AP
  AP --> W1
  AP --> W2
  AP --> W3
  AP --> W4
  
  W1 --> SC
  W2 --> SC
  W3 --> SC
  W4 --> SC
  
  SC --> [ToArray2] : Merged
}

@enduml

AsParallel() — Увімкнення PLINQ

Щоб увімкнути паралельну обробку LINQ запиту, додайте .AsParallel():

AsParallel.cs
using System.Linq;
using System.Diagnostics;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// ❌ Звичайний LINQ — послідовне виконання
var sw1 = Stopwatch.StartNew();
var result1 = numbers
    .Where(n => IsPrime(n))
    .ToArray();
sw1.Stop();
Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, знайдено {result1.Length} простих чисел");

// ✅ PLINQ — паралельне виконання
var sw2 = Stopwatch.StartNew();
var result2 = numbers
    .AsParallel()  // увімкнути паралелізм
    .Where(n => IsPrime(n))
    .ToArray();
sw2.Stop();
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, знайдено {result2.Length} простих чисел");

Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");

static bool IsPrime(int n)
{
    if (n < 2) return false;
    for (int i = 2; i * i <= n; i++)
        if (n % i == 0) return false;
    return true;
}
PLINQ Benchmark
LINQ: 12,847ms, знайдено 664579 простих чисел
PLINQ: 1,823ms, знайдено 664579 простих чисел
Прискорення: 7.0x

AsOrdered() — Збереження Порядку

За замовчуванням PLINQ не гарантує порядок результатів (для максимальної продуктивності). Якщо порядок важливий — використовуйте AsOrdered():

AsOrdered.cs
int[] numbers = Enumerable.Range(1, 20).ToArray();

// ❌ Без AsOrdered — порядок не гарантується
var unordered = numbers
    .AsParallel()
    .Select(n => n * n)
    .ToArray();

Console.WriteLine("Без AsOrdered: " + string.Join(", ", unordered));
// Вивід: 16, 1, 4, 25, 9, 36, 49, 64, ... (випадковий порядок)

// ✅ З AsOrdered — порядок збережено
var ordered = numbers
    .AsParallel()
    .AsOrdered()  // зберегти порядок
    .Select(n => n * n)
    .ToArray();

Console.WriteLine("З AsOrdered: " + string.Join(", ", ordered));
// Вивід: 1, 4, 9, 16, 25, 36, 49, 64, ... (правильний порядок)
Performance Impact: AsOrdered() має overhead — результати потрібно буферизувати та сортувати. Використовуйте тільки коли порядок дійсно важливий.

WithDegreeOfParallelism() — Контроль Кількості Потоків

За замовчуванням PLINQ використовує стільки потоків, скільки вважає оптимальним (зазвичай = кількість CPU cores). Можна обмежити:

WithDegreeOfParallelism.cs
int[] numbers = Enumerable.Range(1, 1_000_000).ToArray();

// Обмежити до 4 потоків
var result = numbers
    .AsParallel()
    .WithDegreeOfParallelism(4)  // максимум 4 потоки
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .ToArray();

Console.WriteLine($"Оброблено {result.Length} елементів");

Коли використовувати:

  • ✅ Обмеження ресурсів (пам'ять, зовнішні API)
  • ✅ Тестування та benchmarking
  • ❌ Не встановлюйте занадто низьке значення — втратите паралелізм

WithCancellation() — Скасування PLINQ

PLINQ підтримує скасування через CancellationToken:

WithCancellation.cs
using System.Threading;
using System.Linq;

using var cts = new CancellationTokenSource();

// Скасовуємо через 2 секунди
Task.Run(() =>
{
    Thread.Sleep(2000);
    cts.Cancel();
});

try
{
    var result = Enumerable.Range(1, 10_000_000)
        .AsParallel()
        .WithCancellation(cts.Token)  // передаємо токен
        .Where(n =>
        {
            Thread.Sleep(1);  // імітація повільної роботи
            return IsPrime(n);
        })
        .ToArray();
    
    Console.WriteLine($"Знайдено {result.Length} простих чисел");
}
catch (OperationCanceledException)
{
    Console.WriteLine("PLINQ запит скасовано");
}

WithExecutionMode() — Контроль Паралелізму

PLINQ автоматично вирішує чи варто виконувати запит паралельно (аналізує overhead). Можна форсувати:

WithExecutionMode.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 100).ToArray();

// Форсувати паралельне виконання (навіть якщо PLINQ вважає що це неефективно)
var result = numbers
    .AsParallel()
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Select(n => n * n)
    .ToArray();

ParallelExecutionMode:

РежимОпис
DefaultPLINQ сам вирішує (рекомендовано)
ForceParallelismЗавжди виконувати паралельно (навіть якщо неефективно)
У 99% випадків використовуйте Default — PLINQ добре вміє визначати коли паралелізм вигідний.

WithMergeOptions() — Контроль Буферизації

PLINQ буферизує результати перед поверненням. Можна контролювати стратегію буферизації:

WithMergeOptions.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 1000).ToArray();

// Повертати результати якомога швидше (мінімальна буферизація)
var result = numbers
    .AsParallel()
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(n => n * n);

foreach (var item in result)
{
    Console.WriteLine(item);  // результати з'являються одразу
}

ParallelMergeOptions:

ОпціяОписUse Case
DefaultPLINQ сам вирішуєРекомендовано для більшості випадків
NotBufferedМінімальна буферизація, результати одразуStreaming, UI updates
AutoBufferedПомірна буферизаціяБаланс між throughput та latency
FullyBufferedМаксимальна буферизаціяМаксимальний throughput

ForAll() — Паралельна Обробка без Збору Результатів

Якщо не потрібно збирати результати (наприклад, запис у файл або базу даних), використовуйте ForAll() замість ToArray():

ForAll.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 1000).ToArray();

// ❌ Неефективно — збирає результати які не потрібні
numbers
    .AsParallel()
    .Select(n => n * n)
    .ToArray();  // створює масив який ніхто не використовує

// ✅ Ефективно — обробляє без збору результатів
numbers
    .AsParallel()
    .ForAll(n =>
    {
        int square = n * n;
        Console.WriteLine($"{n}² = {square}");
        // або запис у файл, базу даних, тощо
    });

Різниця:

  • ToArray() / ToList() — збирає всі результати в колекцію
  • ForAll() — обробляє кожен елемент без збору результатів (швидше, менше пам'яті)

Aggregate() — Паралельна Агрегація

PLINQ має спеціальний метод Aggregate() для паралельної агрегації (сума, добуток, тощо):

Aggregate.cs
using System.Linq;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// Паралельна сума
long sum = numbers
    .AsParallel()
    .Aggregate(
        seed: 0L,  // початкове значення для кожного потоку
        
        // updateAccumulatorFunc — оновлення локального акумулятора
        (localSum, number) => localSum + number,
        
        // combineAccumulatorsFunc — об'єднання локальних акумуляторів
        (sum1, sum2) => sum1 + sum2,
        
        // resultSelector — фінальна трансформація (опціонально)
        finalSum => finalSum
    );

Console.WriteLine($"Сума: {sum}");

Спрощений варіант (коли seed та combineFunc однакові):

// Паралельна сума (спрощено)
long sum = numbers
    .AsParallel()
    .Sum();  // PLINQ автоматично паралелізує Sum()

// Паралельний добуток
long product = numbers
    .AsParallel()
    .Aggregate(1L, (acc, n) => acc * n);

**Partitioning Strategies — Стратегії Розбиття

PLINQ автоматично розбиває вхідну колекцію на chunks для паралельної обробки. Є три стратегії:

Loading diagram...
@startuml
skinparam style plain
skinparam backgroundColor transparent

package "Range Partitioning" {
  rectangle "Array [0..999]" as ARR
  rectangle "Thread 1: [0..249]" as T1
  rectangle "Thread 2: [250..499]" as T2
  rectangle "Thread 3: [500..749]" as T3
  rectangle "Thread 4: [750..999]" as T4
  
  ARR --> T1
  ARR --> T2
  ARR --> T3
  ARR --> T4
}

package "Chunk Partitioning" {
  rectangle "IEnumerable" as IE
  rectangle "Dynamic Distribution" as DD
  
  IE --> DD : Pull next item
  
  note right of DD
    Кожен потік бере
    наступний елемент
    коли звільняється
  end note
}

package "Hash Partitioning" {
  rectangle "GroupBy/Join" as GB
  rectangle "Hash Function" as HF
  rectangle "Partition by Hash" as PH
  
  GB --> HF
  HF --> PH
  
  note right of PH
    Елементи з однаковим
    hash на один потік
  end note
}

@enduml

1. Range Partitioning — для індексованих колекцій (масиви, списки):

// PLINQ розбиває масив на рівні chunks
int[] numbers = new int[1000];
var result = numbers.AsParallel().Select(n => n * n).ToArray();

// Приклад розбиття на 4 потоки:
// Thread 1: [0..249]
// Thread 2: [250..499]
// Thread 3: [500..749]
// Thread 4: [750..999]

2. Chunk Partitioning — для неіндексованих колекцій (IEnumerable):

// PLINQ бере елементи по одному і розподіляє між потоками
IEnumerable<int> numbers = GetNumbers();  // не масив, а IEnumerable
var result = numbers.AsParallel().Select(n => n * n).ToArray();

// Динамічний розподіл — кожен потік бере наступний елемент коли звільняється

3. Hash Partitioning — для операцій з групуванням (GroupBy, Join):

// PLINQ використовує hash для розподілу елементів
var grouped = numbers
    .AsParallel()
    .GroupBy(n => n % 10)  // групування по останній цифрі
    .ToArray();

// Елементи з однаковим hash потрапляють на один потік

PLINQ Performance Tips

✅ DO:

  • Використовуйте PLINQ для CPU-bound операцій з великими колекціями (10,000+ елементів)
  • Використовуйте ForAll() замість ToArray() якщо результати не потрібні
  • Використовуйте AsOrdered() тільки коли порядок дійсно важливий
  • Тестуйте performance — не всі запити стають швидшими з PLINQ

❌ DON'T:

  • Не використовуйте PLINQ для швидких операцій (overhead > виграш)
  • Не використовуйте PLINQ для малих колекцій (<1,000 елементів)
  • Не використовуйте PLINQ для I/O-bound операцій (використовуйте async/await)
  • Не використовуйте PLINQ якщо операції мають великий shared state з contention

Benchmark: LINQ vs PLINQ

LinqVsPlinqBenchmark.cs
using System.Linq;
using System.Diagnostics;

int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();

// LINQ
var sw1 = Stopwatch.StartNew();
var result1 = numbers
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .Sum();
sw1.Stop();

// PLINQ
var sw2 = Stopwatch.StartNew();
var result2 = numbers
    .AsParallel()
    .Where(n => n % 2 == 0)
    .Select(n => n * n)
    .Sum();
sw2.Stop();

Console.WriteLine($"LINQ: {sw1.ElapsedMilliseconds}ms, результат: {result1}");
Console.WriteLine($"PLINQ: {sw2.ElapsedMilliseconds}ms, результат: {result2}");
Console.WriteLine($"Прискорення: {sw1.ElapsedMilliseconds / (double)sw2.ElapsedMilliseconds:F1}x");
LINQ vs PLINQ Benchmark
LINQ: 847ms, результат: 83333333500000
PLINQ: 142ms, результат: 83333333500000
Прискорення: 6.0x

8. Практичні Завдання

Рівень 1: Parallel File Processor з CancellationToken

Створіть утиліту для паралельної обробки файлів у директорії з підтримкою скасування.

Вимоги:

  • Знайти всі .txt файли у директорії
  • Паралельно прочитати кожен файл та підрахувати кількість слів
  • Підтримка CancellationToken для скасування
  • Вивести прогрес обробки
// Приклад використання:
using var cts = new CancellationTokenSource();

// Скасувати через 5 секунд
cts.CancelAfter(TimeSpan.FromSeconds(5));

try
{
    var results = ProcessFiles("C:/Documents", cts.Token);
    
    foreach (var (file, wordCount) in results)
    {
        Console.WriteLine($"{file}: {wordCount} слів");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("Обробка скасована");
}

Рівень 2: Monte Carlo π Calculation з Parallel та PLINQ

Обчисліть число π методом Монте-Карло використовуючи Parallel.For та PLINQ. Порівняйте performance.

Алгоритм:

  1. Генеруємо N випадкових точок у квадраті 0,1 × 0,1
  2. Підраховуємо скільки точок потрапили у чверть кола радіусом 1
  3. π ≈ 4 × (кількість точок у колі) / (загальна кількість точок)
Loading diagram...
@startuml
skinparam style plain
skinparam backgroundColor transparent

rectangle "Square [0,1] × [0,1]" {
  circle "Quarter Circle<br/>radius = 1" as QC
  
  note right of QC
    Точки всередині кола:
    x² + y² ≤ 1
    
    π ≈ 4 × (inside / total)
  end note
}

rectangle "Parallel.For Approach" as PF {
  [Thread-local counter] as TLC
  [Interlocked.Add] as IA
  
  TLC --> IA : Combine results
}

rectangle "PLINQ Approach" as PL {
  [AsParallel()] as AP
  [Count()] as CNT
  
  AP --> CNT : Aggregate
}

@enduml

Вимоги:

  • Реалізувати через Parallel.For з thread-local state
  • Реалізувати через PLINQ
  • Benchmark обох підходів
  • N = 100,000,000 точок

Рівень 3: MapReduce Framework з Task.WhenAll

Створіть спрощений MapReduce framework для паралельної обробки великих текстових файлів.

Loading diagram...
@startuml
skinparam style plain
skinparam backgroundColor transparent

rectangle "Large File" as LF

rectangle "Map Phase" {
  [Chunk 1] as C1
  [Chunk 2] as C2
  [Chunk 3] as C3
  [Chunk 4] as C4
  
  [Task 1: Count words] as T1
  [Task 2: Count words] as T2
  [Task 3: Count words] as T3
  [Task 4: Count words] as T4
  
  C1 --> T1
  C2 --> T2
  C3 --> T3
  C4 --> T4
}

rectangle "Reduce Phase" {
  [Merge Results] as MR
  [Final Word Frequency] as FWF
  
  T1 --> MR
  T2 --> MR
  T3 --> MR
  T4 --> MR
  
  MR --> FWF
}

LF --> C1
LF --> C2
LF --> C3
LF --> C4

note right of MR
  Task.WhenAll()
  координує всі задачі
end note

@enduml

Вимоги:

  • Map phase: розбити файл на chunks, паралельно обробити кожен chunk (підрахувати частоту слів)
  • Reduce phase: об'єднати результати з усіх chunks
  • Використовувати Task.WhenAll() для координації
  • Підтримка CancellationToken
  • Progress reporting
// Приклад використання:
var mapReduce = new MapReduceEngine();

var wordFrequency = mapReduce.ProcessFile(
    filePath: "large-text.txt",
    chunkSize: 1024 * 1024,  // 1 MB chunks
    cancellationToken: cts.Token,
    progress: new Progress<double>(p => Console.WriteLine($"Прогрес: {p:P0}"))
);

// Топ 10 найчастіших слів
var top10 = wordFrequency
    .OrderByDescending(kvp => kvp.Value)
    .Take(10);

foreach (var (word, count) in top10)
{
    Console.WriteLine($"{word}: {count}");
}

Це завершує матеріал про TPL, Parallel та PLINQ. Ви навчились працювати з Task, композицією задач, CancellationToken, exception handling, Parallel class та PLINQ для ефективної паралельної обробки даних.

Copyright © 2026