System Programming Windows

ThreadPool — Просунуті Сценарії та Внутрішня Будова

Глибокий розбір внутрішньої архітектури ThreadPool — Work Stealing Algorithm, Hill Climbing детально, IOCP механізм, custom ThreadPool реалізація, production tuning та advanced patterns для оптимізації багатопотокових застосунків.

ThreadPool: Просунуті Сценарії та Внутрішня Будова

Вступ: Від API до Реалізації

У попередній темі ми розглянули що таке ThreadPool та як його використовувати. Тепер час зануритись глибше і зрозуміти як він працює всередині. Це знання критично важливе для:

  • Діагностики проблем — розуміння чому ThreadPool "підвисає" або працює повільно
  • Оптимізації — знання коли та як налаштовувати ThreadPool для конкретних сценаріїв
  • Архітектурних рішень — вибір між ThreadPool, власним пулом потоків або іншими підходами

Ця тема — для тих хто хоче по-справжньому розуміти багатопотоковість у .NET, а не просто використовувати готові API.

Попередження: Матеріал цієї теми складний і містить деталі реалізації CLR. Якщо ви щойно почали вивчати багатопотоковість — поверніться до цієї теми пізніше, після практики з базовими концепціями.

Work Stealing Algorithm — Детальний Розбір

Проблема: Балансування Навантаження

Аналогія: Черги в супермаркеті

Уявіть величезний супермаркет, де є 8 касирів (наші Worker Threads). Якщо всі 1000 покупців стануть у одну гігантську чергу (Global Queue), почнеться хаос: касири будуть постійно сваритися і блокувати одне одного, щоб покликати наступного, а покупці витрачатимуть час на те, щоб зістикувати потоки. Це і є проблема єдиної глобальної черги — Bottleneck (вузьке горлечко).

Уявіть ThreadPool з 8 worker threads. Якщо всі задачі йдуть у одну спільну чергу — виникає bottleneck:

// Наївна реалізація (НЕ так як у .NET):
class SimpleThreadPool
{
    private readonly ConcurrentQueue<Action> _globalQueue = new();  // ← Bottleneck!

    public void QueueWorkItem(Action work)
    {
        _globalQueue.Enqueue(work);  // Всі потоки конкурують за цю чергу
    }
}

Проблеми:

  1. Contention — всі 8 потоків постійно намагаються взяти задачу з однієї черги → lock contention
  2. Cache misses — черга "стрибає" між CPU cores → погана cache locality
  3. Scalability — чим більше потоків, тим гірше масштабується

Рішення: Work Stealing

.NET ThreadPool використовує дворівневу систему черг:

┌─────────────────────────────────────────────────────────────┐
│                     Global Queue                            │
│  (спільна для всіх, використовується рідко)                │
│  [Task1] [Task2] [Task3] ...                               │
└─────────────────────────────────────────────────────────────┘
                            ↓
        ┌───────────────────┼───────────────────┐
        ↓                   ↓                   ↓
┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│ Worker 1     │    │ Worker 2     │    │ Worker 3     │
│ Local Queue  │    │ Local Queue  │    │ Local Queue  │
│ [T7][T8][T9] │    │ [T4][T5][T6] │    │ [T10][T11]   │
└──────────────┘    └──────────────┘    └──────────────┘
     ↑ LIFO              ↑ LIFO              ↑ LIFO
     │ (власник)         │ (власник)         │ (власник)
     │                   │                   │
     └─────── FIFO ──────┴─────── FIFO ──────┘
         (work stealing між потоками)

Ключові принципи:

  1. Local Queue (Своя каса) — кожен worker thread має власну локальну чергу. Власник працює з нею швидко і без блокувань (lock-free).
  2. LIFO для власника (Стек) — потік бере задачі з кінця своєї черги (Last In First Out - Останній зайшов, перший вийшов).
  3. FIFO для крадіїв (Класична черга) — інші (вільні) потоки непомітно "крадуть" задачі з початку чужих черг (First In First Out).
  4. Global Queue — запасний аеродром (fallback), куди падають задачі, додані ззовні (наприклад, з головного потоку програми), звідки їх потім розбирають worker'и у свої локальні черги.

Чому LIFO для Власника? (Секрет "Теплих" Даних)

Це не випадковість — це оптимізація для cache locality:

// Сценарій: рекурсивна задача
void ProcessDirectory(string path)
{
    var files = Directory.GetFiles(path);
    foreach (var file in files)
    {
        ProcessFile(file);  // Обробка у поточному потоці
    }

    var subdirs = Directory.GetDirectories(path);
    foreach (var dir in subdirs)
    {
        // Створюємо нову задачу → йде у LOCAL queue поточного потоку
        ThreadPool.QueueUserWorkItem(_ => ProcessDirectory(dir));
    }
}

Що відбувається (Людською мовою):

Коли ви щойно розібрали пакунок з документами (батьківська задача), ви дістали звідти 10 окремих папок (підзадачі). Найлогічніше — одразу взяти останню папку, яку ви щойно тримали в руках, і почати з нею працювати, бо ви вже пам'ятаєте контекст (дані знаходяться в "швидкій пам'яті" / кеші CPU). Якби ви взяли найстарішу папку з іншого проєкту, вам довелося б згадувати, що це взагалі таке (Cache Miss — вивантаження нових даних з повільної RAM у мікронний кеш процесора).

  1. Worker Thread 1 виконує ProcessDirectory("C:\\")
  2. Він створює 10 підзадач для піддиректорій → всі йдуть у його local queue.
  3. Він одразу бере останню створену задачу (LIFO) → вона ще "гаряча", перебуває в надшвидкому кеші процесора (L1/L2).
  4. Дані з батьківської задачі (наприклад, path prefix) ще в пам'яті → максимальна швидкість доступу.

Benchmark: LIFO vs FIFO для власника:

// Рекурсивна задача що створює багато підзадач
void RecursiveWork(int depth)
{
    if (depth == 0) return;

    // Створюємо 4 підзадачі
    for (int i = 0; i < 4; i++)
    {
        ThreadPool.QueueUserWorkItem(_ => RecursiveWork(depth - 1));
    }
}

// LIFO (як у .NET): ~500ms для depth=10
// FIFO (гіпотетично): ~800ms для depth=10
// Різниця: 60% швидше завдяки cache locality!

Чому FIFO для Крадіїв? ("Вільна каса!")

Коли Worker Thread 2 виконав усю свою роботу, він стає "крадієм" і йде на допомогу до Worker Thread 1. Але крадій бере задачу з початку черги (найстарішу):

Причина 1: Мінімізація конфліктів (Щоб не штовхатись)

Уявіть, що касир активно обслуговує людей, які щойно підійшли (LIFO). Якщо "вільний касир" підійде і почне висмикувати клієнтів з-під носа свого колеги, вони почнуть заважати одне одному (Lock Contention). Замість цього, вільний касир тихенько підходить до "хвоста" черги клієнтів (FIFO) і забирає людей звідти. Вони працюють з різних кінців колекції, тому конфлікту і блокувань практично не виникає.

  • Власник бере з кінця (LIFO)
  • Крадій бере з початку (FIFO)
  • Вони рідко конкурують за ту саму задачу

Причина 2: Більші задачі

  • Старіші задачі (на початку черги) зазвичай "батьківські" — вони породжують більше підзадач
  • Новіші задачі (в кінці) — "листові" — швидко виконуються
  • Краще вкрасти одну велику задачу ніж багато маленьких

Візуалізація Work Stealing

Loading diagram...
sequenceDiagram
    participant W1 as Worker 1<br/>(власник)
    participant Q1 as Local Queue 1<br/>[A][B][C][D]
    participant W2 as Worker 2<br/>(крадій)

    Note over W1,Q1: Worker 1 виконує задачі
    W1->>Q1: Pop з кінця (LIFO)
    Q1-->>W1: Задача D

    W1->>Q1: Pop з кінця (LIFO)
    Q1-->>W1: Задача C

    Note over W2: Worker 2 закінчив свої задачі
    W2->>Q1: Steal з початку (FIFO)
    Q1-->>W2: Задача A

    Note over W1,W2: Обидва працюють паралельно
    W1->>Q1: Pop з кінця
    Q1-->>W1: Задача B

    Note over Q1: Черга порожня

Реалізація Work Stealing Queue

Спрощена версія (концептуально схожа на .NET):

WorkStealingQueue.cs
using System;
using System.Threading;

/// <summary>
/// Спрощена реалізація Work Stealing Queue.
/// Власник бере з кінця (LIFO), інші крадуть з початку (FIFO).
/// </summary>
public class WorkStealingQueue<T>
{
    private T[] _array = new T[32];
    private int _mask = 31;  // Для швидкого modulo через bitwise AND

    private volatile int _headIndex = 0;  // Для крадіїв (FIFO)
    private volatile int _tailIndex = 0;  // Для власника (LIFO)

    private readonly object _foreignLock = new();  // Тільки для крадіїв

    /// <summary>Додати задачу (викликає тільки власник).</summary>
    public void LocalPush(T item)
    {
        int tail = _tailIndex;

        // Перевірка чи потрібно розширити масив
        if (tail - _headIndex >= _mask)
        {
            lock (_foreignLock)
            {
                GrowArray();
            }
        }

        _array[tail & _mask] = item;
        _tailIndex = tail + 1;  // Publish нового tail
    }

    /// <summary>Забрати задачу (викликає тільки власник, LIFO).</summary>
    public bool LocalPop(out T? item)
    {
        int tail = _tailIndex - 1;
        Interlocked.Exchange(ref _tailIndex, tail);  // Atomic decrement

        if (_headIndex <= tail)
        {
            // Черга не порожня
            item = _array[tail & _mask];
            return true;
        }
        else
        {
            // Черга порожня або конфлікт з крадієм
            _tailIndex = tail + 1;  // Відкат
            item = default;
            return false;
        }
    }

    /// <summary>Вкрасти задачу (викликають інші потоки, FIFO).</summary>
    public bool TrySteal(out T? item)
    {
        lock (_foreignLock)  // Крадії синхронізуються між собою
        {
            int head = _headIndex;
            int tail = _tailIndex;

            if (head < tail)
            {
                // Є задачі для крадіжки
                item = _array[head & _mask];
                _headIndex = head + 1;  // Atomic increment
                return true;
            }

            item = default;
            return false;
        }
    }

    private void GrowArray()
    {
        int newSize = (_array.Length - 1) * 2 + 1;  // Подвоїти розмір
        var newArray = new T[newSize];

        int head = _headIndex;
        int tail = _tailIndex;

        for (int i = head; i < tail; i++)
        {
            newArray[i & (newSize - 1)] = _array[i & _mask];
        }

        _array = newArray;
        _mask = newSize - 1;
    }

    public int Count => Math.Max(0, _tailIndex - _headIndex);
}

Ключові деталі:

  • LocalPush/LocalPopбез lock для власника (швидко!)
  • TryStealз lock для крадіїв (вони конкурують між собою)
  • Circular buffer з bitwise AND замість modulo (швидше)
  • Atomic операції для _headIndex/_tailIndex (видимість між потоками)

Hill Climbing Algorithm — Математична Модель

Проблема: Скільки Потоків Оптимально?

У темі 09 ми розглянули концепцію Hill Climbing. Тепер — математика та деталі реалізації.

Аналогія: Менеджер Ресторану (Hill Climbing)

Уявіть менеджера великого ресторану. Його мета — щоб кухня видавала якомога більше страв за годину (Throughput). Якщо кухарів (Потоків) замало, замовлення простоюють. Менеджер наймає ще одного кухаря, бачить, що швидкість видачі страв зросла — супер, наймає ще одного. Але в якийсь момент кухарів стає настільки багато, що вони починають штовхатися біля плит, битися посудом і чекати в черзі до духовки (Context Switching, Lock Contention). Продуктивність раптово падає. Менеджер бачить падіння ефективності, розуміє, що перегнув палицю, і починає звільняти кухарів по одному, доки швидкість знову не стане максимальною. Ця гра невпинного балансування "додав — перевірив — відібрав" і є алгоритмом Hill Climbing (Підйом на пагорб).

Мета: Максимізувати throughput (пропускну здатність) — кількість виконаних задач за одиницю часу.

Виклик: Оптимальна кількість потоків залежить від навантаження (меню постійно змінюється):

  • Чисто CPU-bound код (нарізка салатів) → оптимум = кількості процесорних ядер (робочих столів). Більше кухарів просто не матимуть місця, де працювати.
  • Код з блокуваннями / I/O (супи, що варяться 20 хвилин) → оптимум > кількості ядер (поки суп вариться, кухар може стати варити інший).
  • Змішане навантаження → оптимум змінюється динамічно кожної секунди.

Алгоритм: Gradient Ascent з Adaptive Step

Hill Climbing у .NET — це варіант gradient ascent (підйом за градієнтом):

Throughput
    ↑
    │         ╱╲  ← Оптимум (шукаємо цю точку)
    │        ╱  ╲
    │       ╱    ╲
    │      ╱      ╲
    │     ╱        ╲
    │____╱__________╲____________→ Кількість потоків
         ↑          ↑
    Занадто мало  Занадто багато
    (CPU idle)    (context switch overhead)

Псевдокод алгоритму:

class HillClimbingAlgorithm
{
    private int _currentThreadCount;
    private double _currentThroughput;
    private int _direction = +1;  // +1 = додавати, -1 = забирати
    private double _stepSize = 1.0;

    public void Adjust()
    {
        // 1. Вимірюємо поточний throughput
        double newThroughput = MeasureThroughput();

        // 2. Обчислюємо зміну (gradient)
        double delta = newThroughput - _currentThroughput;

        if (delta > 0)
        {
            // Throughput зріс → продовжуємо у тому ж напрямку
            _currentThreadCount += (int)(_direction * _stepSize);
            _stepSize *= 1.05;  // Збільшуємо крок (прискорюємось)
        }
        else if (delta < 0)
        {
            // Throughput впав → змінюємо напрямок
            _direction = -_direction;
            _stepSize = Math.Max(1.0, _stepSize * 0.9);  // Зменшуємо крок
            _currentThreadCount += (int)(_direction * _stepSize);
        }
        else
        {
            // Throughput не змінився → пробуємо інший напрямок
            _direction = -_direction;
            _currentThreadCount += _direction;
        }

        // 3. Обмеження
        _currentThreadCount = Math.Clamp(_currentThreadCount, MinThreads, MaxThreads);

        // 4. Застосовуємо нову кількість
        SetWorkerThreadCount(_currentThreadCount);

        // 5. Зберігаємо для наступної ітерації
        _currentThroughput = newThroughput;
    }

    private double MeasureThroughput()
    {
        // Кількість виконаних задач за останні 500ms
        return CompletedWorkItemCount / 0.5;  // tasks/second
    }
}

Exponential Smoothing для Стабільності

Реальний throughput "шумний" — він стрибає через:

  • Garbage Collection паузи
  • Зовнішні процеси на машині
  • Мережеві затримки

CLR використовує exponential smoothing для згладжування:

double _smoothedThroughput = 0;
const double Alpha = 0.3;  // Вага нового вимірювання (0-1)

void UpdateThroughput(double measured)
{
    // Exponential Moving Average (EMA)
    _smoothedThroughput = Alpha * measured + (1 - Alpha) * _smoothedThroughput;

    // Використовуємо згладжене значення для рішень
    if (_smoothedThroughput > _previousSmoothed)
    {
        // Throughput зростає...
    }
}

Ефект: Алгоритм не реагує на короткочасні стрибки, але швидко адаптується до справжніх змін навантаження.

Wave Behavior — Чому Потоки Додаються Хвилями

Якщо ви моніторите ThreadPool.ThreadCount у production, побачите хвилеподібну поведінку:

Threads
   ↑
 12│    ╱╲      ╱╲      ╱╲
 10│   ╱  ╲    ╱  ╲    ╱  ╲
  8│  ╱    ╲  ╱    ╲  ╱    ╲
  6│ ╱      ╲╱      ╲╱      ╲
  4│╱________________________╲___→ Time

Чому так відбувається? (Танець навколо оптимуму)

Щоб з впевненістю дізнатися, що ви на самій вершині гори, вам доведеться зробити хоча б один крок донизу.

  1. Навантаження зростає → Hill Climbing додає потоки (+1, +1) → Throughput (швидкість видачі) теж зростає. Ми йдемо вгору.
  2. Досягнуто оптимуму, але алгоритм цього не знає! Він робить ще один "сліпий" крок (+1).
  3. Throughput падає → стало забагато потоків (почалась штовханина). Алгоритм розуміє: "Ага, вершина була щойно позаду". Додавання більше не допомагає.
  4. Алгоритм змінює напрямок і починає звільняти (забирати) потоки (-1, -1).
  5. Throughput знову падає → потоків стало замало для поточного потоку даних. Алгоритм знову розвертається.
  6. Цикл повторюється нескінченно — алгоритм постійно "хитається" туди-сюди, малюючи хвилі на своїх графіках. Діючи так, він завжди адаптується до зовнішніх змін.

Це абсолютно нормальна і здорова поведінка. Алгоритм не може магічно зупинитися ідеально в центрі через саму свою природу (і мінливість систем навантаження).

Коли Hill Climbing Не Справляється

Сценарій 1: Раптовий сплеск навантаження

// Раптово додаємо 10000 задач
for (int i = 0; i < 10000; i++)
{
    ThreadPool.QueueUserWorkItem(_ => DoWork());
}

// Hill Climbing додає потоки ПОВІЛЬНО (по 1-2 кожні 500ms)
// Перші 5000 задач чекатимуть у черзі!

Рішення: Збільшити MinThreads якщо знаєте що завжди потрібно багато потоків:

ThreadPool.SetMinThreads(workerThreads: 50, completionPortThreads: 50);
// Тепер ThreadPool одразу має 50 потоків готових до роботи

Сценарій 2: Блокуючі операції

// ❌ ПОГАНО: блокуємо worker threads
for (int i = 0; i < 100; i++)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(10_000);  // Блокування на 10 секунд
    });
}

// Hill Climbing бачить що throughput = 0 (нічого не завершується)
// Він додає потоки, але вони теж блокуються
// Результат: thread starvation

Рішення: Ніколи не блокуйте worker threads — використовуйте async/await.


Custom ThreadPool Implementation

Тепер створимо власний ThreadPool з нуля щоб зрозуміти всі деталі:

CustomThreadPool.cs
using System;
using System.Collections.Concurrent;
using System.Threading;

/// <summary>
/// Спрощена реалізація ThreadPool для навчання.
/// Підтримує: динамічну кількість потоків, graceful shutdown, статистику.
/// </summary>
public class CustomThreadPool : IDisposable
{
    private readonly ConcurrentQueue<Action> _workQueue = new();
    private readonly List<Thread> _workers = new();
    private readonly ManualResetEventSlim _workAvailable = new(false);

    private readonly int _minThreads;
    private readonly int _maxThreads;
    private int _currentThreads = 0;

    private long _completedWorkItems = 0;
    private long _totalWorkItems = 0;

    private volatile bool _shutdown = false;

    public CustomThreadPool(int minThreads = 4, int maxThreads = 100)
    {
        _minThreads = minThreads;
        _maxThreads = maxThreads;

        // Створюємо мінімальну кількість потоків одразу
        for (int i = 0; i < _minThreads; i++)
        {
            CreateWorkerThread();
        }
    }

    /// <summary>Додати задачу у чергу.</summary>
    public void QueueUserWorkItem(Action work)
    {
        if (_shutdown)
            throw new InvalidOperationException("ThreadPool is shutting down");

        Interlocked.Increment(ref _totalWorkItems);
        _workQueue.Enqueue(work);
        _workAvailable.Set();  // Сигнал: є робота

        // Простий injection logic: якщо черга довга і є місце → додати потік
        if (_workQueue.Count > _currentThreads * 2 && _currentThreads < _maxThreads)
        {
            lock (_workers)
            {
                if (_currentThreads < _maxThreads)
                {
                    CreateWorkerThread();
                }
            }
        }
    }

    private void CreateWorkerThread()
    {
        var worker = new Thread(WorkerLoop)
        {
            IsBackground = true,
            Name = $"CustomThreadPool-Worker-{_currentThreads}"
        };

        _workers.Add(worker);
        Interlocked.Increment(ref _currentThreads);
        worker.Start();
    }

    private void WorkerLoop()
    {
        DateTime lastWorkTime = DateTime.UtcNow;

        while (!_shutdown)
        {
            if (_workQueue.TryDequeue(out var work))
            {
                try
                {
                    work();
                    Interlocked.Increment(ref _completedWorkItems);
                    lastWorkTime = DateTime.UtcNow;
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine($"Worker exception: {ex.Message}");
                }
            }
            else
            {
                // Немає роботи

                // Thread retirement: якщо не було роботи 30 секунд і потоків більше ніж min
                if (_currentThreads > _minThreads &&
                    (DateTime.UtcNow - lastWorkTime).TotalSeconds > 30)
                {
                    // Завершуємо цей потік
                    Interlocked.Decrement(ref _currentThreads);
                    return;
                }

                // Чекаємо на сигнал (з timeout для перевірки shutdown)
                _workAvailable.Wait(TimeSpan.FromSeconds(1));
                _workAvailable.Reset();
            }
        }
    }

    /// <summary>Graceful shutdown: чекаємо завершення всіх задач.</summary>
    public void Shutdown(TimeSpan timeout)
    {
        _shutdown = true;
        _workAvailable.Set();  // Розбудити всіх

        var deadline = DateTime.UtcNow + timeout;

        foreach (var worker in _workers)
        {
            var remaining = deadline - DateTime.UtcNow;
            if (remaining > TimeSpan.Zero)
            {
                worker.Join(remaining);
            }
        }
    }

    public void Dispose()
    {
        Shutdown(TimeSpan.FromSeconds(10));
        _workAvailable.Dispose();
    }

    // Статистика
    public int CurrentThreads => _currentThreads;
    public int QueueLength => _workQueue.Count;
    public long CompletedWorkItems => Interlocked.Read(ref _completedWorkItems);
    public long TotalWorkItems => Interlocked.Read(ref _totalWorkItems);
    public double CompletionRate => TotalWorkItems > 0
        ? CompletedWorkItems / (double)TotalWorkItems
        : 0;
}

Демонстрація Custom ThreadPool

CustomThreadPoolDemo.cs
using System;
using System.Diagnostics;
using System.Threading;

var pool = new CustomThreadPool(minThreads: 2, maxThreads: 10);

Console.WriteLine("=== Custom ThreadPool Demo ===\n");

// Моніторинг у фоні
var monitor = new Thread(() =>
{
    while (true)
    {
        Console.WriteLine($"[Monitor] Threads: {pool.CurrentThreads}, " +
                         $"Queue: {pool.QueueLength}, " +
                         $"Completed: {pool.CompletedWorkItems}/{pool.TotalWorkItems} " +
                         $"({pool.CompletionRate:P0})");
        Thread.Sleep(500);
    }
}) { IsBackground = true };
monitor.Start();

// Тест 1: Раптовий сплеск навантаження
Console.WriteLine("Test 1: Burst of 1000 tasks...");
var sw = Stopwatch.StartNew();

for (int i = 0; i < 1000; i++)
{
    int taskId = i;
    pool.QueueUserWorkItem(() =>
    {
        Thread.Sleep(Random.Shared.Next(10, 50));  // Симуляція роботи
    });
}

// Чекаємо завершення
while (pool.CompletedWorkItems < 1000)
{
    Thread.Sleep(100);
}

sw.Stop();
Console.WriteLine($"Completed in {sw.ElapsedMilliseconds}ms\n");

// Тест 2: Тривале навантаження
Console.WriteLine("Test 2: Sustained load for 10 seconds...");
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

int submitted = 0;
while (!cts.Token.IsCancellationRequested)
{
    pool.QueueUserWorkItem(() =>
    {
        Thread.Sleep(Random.Shared.Next(5, 20));
    });
    submitted++;
    Thread.Sleep(10);
}

Console.WriteLine($"Submitted {submitted} tasks during sustained load\n");

// Cleanup
Thread.Sleep(2000);  // Дати час завершити
pool.Dispose();

Console.WriteLine("=== Demo Complete ===");
Custom ThreadPool Output
=== Custom ThreadPool Demo ===
Test 1: Burst of 1000 tasks...
[Monitor] Threads: 2, Queue: 998, Completed: 2/1000 (0%)
[Monitor] Threads: 4, Queue: 850, Completed: 150/1000 (15%)
[Monitor] Threads: 8, Queue: 600, Completed: 400/1000 (40%)
[Monitor] Threads: 10, Queue: 200, Completed: 800/1000 (80%)
[Monitor] Threads: 10, Queue: 0, Completed: 1000/1000 (100%)
Completed in 3247ms
Test 2: Sustained load for 10 seconds...
[Monitor] Threads: 6, Queue: 15, Completed: 450/465 (97%)
Submitted 982 tasks during sustained load

IOCP (I/O Completion Ports) — Windows Kernel Magic

Що Таке IOCP

I/O Completion Port (IOCP) — геніальний механізм Windows kernel (ядра ОС) для надвисокоефективної обробки асинхронних I/O (Input/Output) операцій (читання файлів, робота з мережею). Це не .NET концепція — це нативна магія Windows, яку .NET просто обгортає у зручний інтерфейс.

Аналогія: Замовлення Піци у 10,000 клієнтів

Без IOCP (Синхронно): Ви — оператор піцерії (Thread). Ви прийняли велике замовлення, передали статус на кухню і... стали прямо біля дверей видачі, нічого не роблячи (Потік заблоковано). Щоб обслужити 10,000 людей одночасно, вам знадобиться 10,000 операторів, які просто стоятимуть і тупитимуть. Це катастрофічна витрата оперативної пам'яті (RAM).

З IOCP (Асинхронно): Ви приймаєте замовлення, передаєте на кухню (запит до інтерфейсу мережі/дискового контролера), залишаєте записку з номером (Callback / Task) і одразу йдете обслуговувати іншого клієнта (Потік повертається в Пул унікально вільним). Коли піцу спечено, кухар (залізо/DMA-контролер) сам кладе її на спеціальний стіл видачі (I/O Completion Port). Будь-який з 20 вільних операторів (IOCP Threads), який проходить повз, візьме готову піцу з цього столу і понесе клієнту. Так 10-20 працівників спокійно та блискавично можуть обслужити 10,000 замовлень!

Проблема без IOCP (У коді):

// Наївний підхід: один потік на з'єднання
void HandleClient(Socket client)
{
    var thread = new Thread(() =>
    {
        while (true)
        {
            byte[] buffer = new byte[1024];
            int received = client.Receive(buffer);  // ← БЛОКУЄ потік!

            if (received == 0) break;
            ProcessData(buffer, received);
        }
    });
    thread.Start();
}

// Проблема: 10,000 клієнтів = 10,000 потоків = катастрофа!

**Рішення з IO CP**:

// З IOCP: один потік обслуговує тисячі з'єднань
async Task HandleClientAsync(Socket client)
{
    while (true)
    {
        byte[] buffer = new byte[1024];
        int received = await client.ReceiveAsync(buffer);  // ← НЕ блокує потік!

        if (received == 0) break;
        ProcessData(buffer, received);
    }
}

// 10,000 клієнтів = ~10-20 потоків (IOCP threads) = ефективно!

Як Працює IOCP

Архітектура:

┌─────────────────────────────────────────────────────────┐
│                    Application                          │
│  socket.ReceiveAsync() → повертає Task одразу           │
└────────────────────┬────────────────────────────────────┘
                     │ (не блокує потік)
                     ↓
┌─────────────────────────────────────────────────────────┐
│              Windows Kernel (IOCP)                      │
│  ┌──────────────────────────────────────────────────┐  │
│  │  I/O Completion Queue                            │  │
│  │  [Completed I/O 1] [Completed I/O 2] ...        │  │
│  └──────────────────────────────────────────────────┘  │
│                     ↑                                   │
│  Network Card ──────┘ (DMA transfer, no CPU)           │
└─────────────────────────────────────────────────────────┘
                     │
                     ↓ (GetQueuedCompletionStatus)
┌─────────────────────────────────────────────────────────┐
│           ThreadPool IOCP Threads                       │
│  Thread 1: чекає на IOCP → отримує completion → викликає callback
│  Thread 2: чекає на IOCP → отримує completion → викликає callback
└─────────────────────────────────────────────────────────┘

Ключові кроки (Як це працює технічно):

  1. Ініціація I/O: Ваша програма викликає socket.ReceiveAsync(). Це наказ ядру Windows (Kernel) розпочати слухати мережу.
  2. Потік звільняється: Метод миттєво повертає Task, і ваш Thread йде виконувати іншу роботу. Він не висить в стані Thread.Sleep або блокування.
  3. Магія DMA (Direct Memory Access): Мережева карта фізично отримує байти з інтернет-кабелю і записує їх прямо в оперативну пам'ять (RAM). Центральний процесор (CPU) у цьому взагалі не бере участі! Комп'ютер продовжує працювати над іншими задачами з нульовим навантаженням.
  4. Completion (Завершення): Коли пакет даних повністю прибув у RAM, залізо (мережева плата) надсилає переривання (interrupt) мікропроцесору. Kernel ловить його і записує "Подія виконана" у спеціальну закриту чергу — IOCP queue (Стіл видачі).
  5. Пробудження: Спеціальний пул потоків (ThreadPool IOCP Threads) постійно моніторить цю чергу через надзвичайно легку функцію GetQueuedCompletionStatus(). Найбільш вільний потік бере результат з черги і продовжує виконання вашого коду (continuation, що стоїть після слова await).

IOCP API (Win32)

Під капотом .NET використовує ці Windows API:

// Створення IOCP (один на весь процес)
HANDLE iocp = CreateIoCompletionPort(
    INVALID_HANDLE_VALUE,  // створити новий IOCP
    NULL,
    0,
    0  // кількість concurrent threads (0 = auto)
);

// Асоціювання socket з IOCP
CreateIoCompletionPort(
    (HANDLE)socket,  // handle до socket
    iocp,            // існуючий IOCP
    (ULONG_PTR)socket,  // completion key (ідентифікатор)
    0
);

// IOCP Thread loop
while (true)
{
    DWORD bytesTransferred;
    ULONG_PTR completionKey;
    OVERLAPPED* overlapped;

    // Блокуючий виклик: чекає поки kernel не поверне завершену I/O операцію
    BOOL success = GetQueuedCompletionStatus(
        iocp,
        &bytesTransferred,
        &completionKey,
        &overlapped,
        INFINITE  // timeout
    );

    if (success)
    {
        // I/O завершилась успішно → викликати .NET callback
        InvokeCallback(overlapped, bytesTransferred);
    }
}

Overlapped I/O (Конверт із зворотньою адресою)

Аналогія:

Overlapped — це своєрідний "конверт із зворотньою адресою" на рівні ядра ОС. Коли ви відправляєте запит на читання диска, ви передаєте цей конверт ядру. Ядро відкладає його в пам'ять. Коли диск зчитає масив даних, ядро дістає ваш конверт, укладає туди статус операції (Успішно/Помилка, байтів отримано) і за метаданими звідти розуміє, кому саме в .NET треба просигналізувати про завершення.

Overlapped — це C/C++ структура, що зберігає стан асинхронної I/O операції:

typedef struct _OVERLAPPED {
    ULONG_PTR Internal;      // Статус операції
    ULONG_PTR InternalHigh;  // Кількість переданих байтів
    DWORD     Offset;        // Offset у файлі (для файлів)
    DWORD     OffsetHigh;
    HANDLE    hEvent;        // Event для сигналізації (опційно)
} OVERLAPPED;

Коли ви викликаєте socket.ReceiveAsync():

  1. .NET виділяє OVERLAPPED структуру
  2. Викликає WSARecv() з прапорцем overlapped
  3. Kernel зберігає OVERLAPPED* і повертає ERROR_IO_PENDING
  4. Коли I/O завершується → kernel додає OVERLAPPED* у IOCP queue

Benchmark: Blocking I/O vs IOCP

IOCPBenchmark.cs
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

// Сервер що обслуговує багато клієнтів

// ❌ Blocking I/O: один потік на клієнта
class BlockingServer
{
    public void Start(int port)
    {
        var listener = new TcpListener(IPAddress.Any, port);
        listener.Start();

        while (true)
        {
            var client = listener.AcceptTcpClient();  // Блокує

            // Новий потік для кожного клієнта!
            new Thread(() => HandleClient(client)) { IsBackground = true }.Start();
        }
    }

    private void HandleClient(TcpClient client)
    {
        var stream = client.GetStream();
        byte[] buffer = new byte[1024];

        while (true)
        {
            int read = stream.Read(buffer, 0, buffer.Length);  // Блокує потік!
            if (read == 0) break;

            stream.Write(buffer, 0, read);  // Echo назад
        }
    }
}

// ✅ IOCP: async/await
class IOCPServer
{
    public async Task StartAsync(int port)
    {
        var listener = new TcpListener(IPAddress.Any, port);
        listener.Start();

        while (true)
        {
            var client = await listener.AcceptTcpClientAsync();  // Async

            // Не створюємо потік — просто запускаємо Task
            _ = HandleClientAsync(client);
        }
    }

    private async Task HandleClientAsync(TcpClient client)
    {
        var stream = client.GetStream();
        byte[] buffer = new byte[1024];

        while (true)
        {
            int read = await stream.ReadAsync(buffer);  // НЕ блокує потік!
            if (read == 0) break;

            await stream.WriteAsync(buffer.AsMemory(0, read));  // Echo назад
        }
    }
}

// Результати для 10,000 одночасних клієнтів:
// Blocking: 10,000 потоків, ~10GB RAM, CPU 100% (context switching)
// IOCP:     ~20 потоків, ~200MB RAM, CPU 5% (ефективно!)

Чому IOCP Настільки Ефективний

1. Zero CPU для очікування I/O

  • Blocking I/O: потік спить у kernel → займає пам'ять, потребує context switch
  • IOCP: немає потоку що чекає → kernel сам сигналізує коли готово

2. Scalability

  • Blocking: O(n) потоків для n з'єднань
  • IOCP: O(1) потоків (фіксована кількість IOCP threads)

3. DMA (Direct Memory Access)

  • Мережева карта пише дані прямо у RAM без участі CPU
  • CPU отримує interrupt тільки коли дані готові

Production Tuning: Коли та Як Налаштовувати ThreadPool

Метрики для Моніторингу

1. PendingWorkItemCount — кількість задач у черзі:

long pending = ThreadPool.PendingWorkItemCount;

if (pending > 1000)
{
    Console.WriteLine("WARNING: ThreadPool queue is growing!");
    // Можливі причини:
    // - Thread starvation (всі потоки заблоковані)
    // - Недостатньо потоків (збільшити MinThreads)
    // - Занадто багато задач (throttling)
}

2. AvailableThreads — скільки потоків вільні:

ThreadPool.GetAvailableThreads(out int availableWorker, out int availableIOCP);
ThreadPool.GetMaxThreads(out int maxWorker, out int maxIOCP);

int busyWorker = maxWorker - availableWorker;

if (busyWorker > maxWorker * 0.9)
{
    Console.WriteLine("WARNING: 90% of worker threads are busy!");
}

3. CompletedWorkItemCount — lifetime counter:

long completed = ThreadPool.CompletedWorkItemCount;
// Використовуйте для обчислення throughput

Коли Збільшувати MinThreads

Сценарій 1: Раптові сплески навантаження

// Проблема: Hill Climbing повільно реагує
// Рішення: збільшити MinThreads

// Перед (default MinThreads = 8 на 8-core машині):
// - 1000 задач додаються одразу
// - Перші 8 виконуються, решта чекають
// - Hill Climbing додає потоки по 1-2 кожні 500ms
// - Результат: перші 500 задач чекають 10+ секунд

// Після (MinThreads = 50):
ThreadPool.SetMinThreads(workerThreads: 50, completionPortThreads: 50);
// - 1000 задач додаються одразу
// - Перші 50 виконуються одразу
// - Результат: набагато швидше

Сценарій 2: Блокуючі операції (якщо async неможливий)

// Якщо ви ЗМУШЕНІ використовувати blocking API (legacy код):
ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 100);

// Але краще: переписати на async!

Правило: Збільшуйте MinThreads тільки якщо:

  • Profiling показав thread starvation
  • Async/await неможливий (legacy код)
  • Ви точно знаєте що завжди потрібно багато потоків

Коли Зменшувати MaxThreads

Сценарій: Обмеження ресурсів

// Контейнер з обмеженою пам'яттю (512MB)
// Кожен потік = ~1MB стеку
// MaxThreads = 32767 (default) → потенційно 32GB!

ThreadPool.SetMaxThreads(workerThreads: 100, completionPortThreads: 100);
// Тепер максимум 100 потоків = ~100MB стеків

Правило: Зменшуйте MaxThreads тільки якщо:

  • Обмежена пам'ять (контейнери, embedded)
  • Потрібен rate limiting на рівні потоків

ETW Events для Діагностики

Збір ThreadPool events:

# Через dotnet-trace
dotnet-trace collect \
  --process-id <PID> \
  --providers Microsoft-Windows-DotNETRuntime:0x10000:5

# Через PerfView (Windows)
PerfView.exe /nogui collect \
  /providers=*Microsoft-Windows-DotNETRuntime:0x10000:5

Ключові події:

  • ThreadPoolWorkerThreadStart — новий потік створено
  • ThreadPoolWorkerThreadStop — потік завершено (retirement)
  • ThreadPoolWorkerThreadAdjustmentSample — Hill Climbing рішення
  • ThreadPoolWorkerThreadWait — потік чекає роботи
  • ThreadPoolEnqueueWorkObject — задача додана у чергу

Аналіз у PerfView:

  1. Відкрити .etl файл
  2. Events → Filter: ThreadPool
  3. Шукати аномалії:
    • Багато ThreadStart без ThreadStop → leak
    • Високий PendingWorkItemCount → starvation
    • Часті Adjustment → нестабільне навантаження

Advanced Patterns

Pattern 1: Priority Queue поверх ThreadPool

PriorityThreadPool.cs
using System;
using System.Collections.Generic;
using System.Threading;

public enum Priority { Low, Normal, High }

public class PriorityThreadPool
{
    private readonly SortedDictionary<Priority, Queue<Action>> _queues = new()
    {
        { Priority.High, new Queue<Action>() },
        { Priority.Normal, new Queue<Action>() },
        { Priority.Low, new Queue<Action>() }
    };

    private readonly object _lock = new();
    private readonly ManualResetEventSlim _workAvailable = new(false);
    private volatile bool _shutdown = false;

    public PriorityThreadPool(int workerCount = 4)
    {
        for (int i = 0; i < workerCount; i++)
        {
            ThreadPool.QueueUserWorkItem(_ => WorkerLoop());
        }
    }

    public void Enqueue(Action work, Priority priority = Priority.Normal)
    {
        lock (_lock)
        {
            _queues[priority].Enqueue(work);
            _workAvailable.Set();
        }
    }

    private void WorkerLoop()
    {
        while (!_shutdown)
        {
            Action? work = null;

            lock (_lock)
            {
                // Шукаємо задачу з найвищим пріоритетом
                foreach (var priority in new[] { Priority.High, Priority.Normal, Priority.Low })
                {
                    if (_queues[priority].Count > 0)
                    {
                        work = _queues[priority].Dequeue();
                        break;
                    }
                }
            }

            if (work != null)
            {
                work();
            }
            else
            {
                _workAvailable.Wait(TimeSpan.FromSeconds(1));
                _workAvailable.Reset();
            }
        }
    }

    public void Shutdown() => _shutdown = true;
}

Pattern 2: Rate Limiting з ThreadPool

RateLimitedThreadPool.cs
using System;
using System.Threading;

public class RateLimitedThreadPool
{
    private readonly SemaphoreSlim _semaphore;
    private readonly int _maxConcurrent;

    public RateLimitedThreadPool(int maxConcurrent)
    {
        _maxConcurrent = maxConcurrent;
        _semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent);
    }

    public async Task ExecuteAsync(Func<Task> work)
    {
        await _semaphore.WaitAsync();

        try
        {
            await work();
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public int AvailableSlots => _semaphore.CurrentCount;
}

// Використання: обмежити до 5 concurrent HTTP requests
var rateLimiter = new RateLimitedThreadPool(maxConcurrent: 5);

var tasks = Enumerable.Range(0, 100).Select(i =>
    rateLimiter.ExecuteAsync(async () =>
    {
        await httpClient.GetStringAsync($"https://api.example.com/item/{i}");
    })
);

await Task.WhenAll(tasks);
// Максимум 5 requests одночасно, решта чекають

Pattern 3: Circuit Breaker

CircuitBreaker.cs
using System;
using System.Threading;

public class CircuitBreaker
{
    private enum State { Closed, Open, HalfOpen }

    private State _state = State.Closed;
    private int _failureCount = 0;
    private DateTime _lastFailureTime;

    private readonly int _failureThreshold;
    private readonly TimeSpan _timeout;
    private readonly object _lock = new();

    public CircuitBreaker(int failureThreshold = 5, TimeSpan? timeout = null)
    {
        _failureThreshold = failureThreshold;
        _timeout = timeout ?? TimeSpan.FromSeconds(60);
    }

    public async Task<T> ExecuteAsync<T>(Func<Task<T>> operation)
    {
        lock (_lock)
        {
            if (_state == State.Open)
            {
                if (DateTime.UtcNow - _lastFailureTime > _timeout)
                {
                    _state = State.HalfOpen;
                }
                else
                {
                    throw new InvalidOperationException("Circuit breaker is OPEN");
                }
            }
        }

        try
        {
            var result = await operation();

            lock (_lock)
            {
                _failureCount = 0;
                _state = State.Closed;
            }

            return result;
        }
        catch
        {
            lock (_lock)
            {
                _failureCount++;
                _lastFailureTime = DateTime.UtcNow;

                if (_failureCount >= _failureThreshold)
                {
                    _state = State.Open;
                }
            }

            throw;
        }
    }

    public string CurrentState => _state.ToString();
}

Підсумок

Work Stealing

  • Local queues (per-thread) + Global queue
  • LIFO для власника (cache locality)
  • FIFO для крадіїв (мінімізація конфліктів)
  • Масштабується на багатоядерних системах

Hill Climbing

  • Gradient ascent з adaptive step
  • Exponential smoothing для стабільності
  • Wave behavior — нормальна поведінка
  • Повільно реагує на сплески → збільшити MinThreads

IOCP

  • Windows kernel механізм для async I/O
  • Zero CPU для очікування I/O
  • DMA transfer без участі CPU
  • Scalability: O(1) потоків для O(n) з'єднань

Production Tuning

  • Моніторинг: PendingWorkItemCount, AvailableThreads
  • Збільшити MinThreads для сплесків
  • Зменшити MaxThreads для обмеження ресурсів
  • ETW events для глибокої діагностики

Практичні Завдання

Рівень 1: Work Stealing Queue Implementation

Реалізуйте повноцінну Work Stealing Queue:

Вимоги:

  1. LocalPush(T item) — додати задачу (тільки власник)
  2. LocalPop(out T item) — забрати задачу LIFO (тільки власник)
  3. TrySteal(out T item) — вкрасти задачу FIFO (інші потоки)
  4. Підтримка динамічного розширення масиву
  5. Thread-safe для concurrent stealing

Тест:

  • 8 worker threads
  • Кожен додає 1000 задач у свою чергу
  • Всі threads крадуть задачі один у одного
  • Перевірте що всі 8000 задач виконані

Рівень 2: Custom ThreadPool з Hill Climbing

Розширте Custom ThreadPool додавши спрощений Hill Climbing:

Вимоги:

  1. Вимірювання throughput кожні 500ms
  2. Додавання/видалення потоків на основі throughput
  3. Exponential smoothing для згладжування
  4. Логування рішень Hill Climbing

Тест:

  • Запустіть з MinThreads=2, MaxThreads=20
  • Додайте 10000 задач з різною тривалістю
  • Перевірте що кількість потоків динамічно змінюється
  • Виведіть графік: Threads vs Time

Рівень 3: IOCP Echo Server

Створіть високопродуктивний echo server через IOCP:

Вимоги:

  1. Async/await для всіх I/O операцій
  2. Підтримка 10,000+ одночасних з'єднань
  3. Graceful shutdown
  4. Метрики: active connections, bytes/sec, requests/sec

Тест:

  • Запустіть сервер на порту 8080
  • Створіть 10,000 клієнтів що відправляють дані
  • Виміряйте throughput (MB/sec)
  • Порівняйте з blocking версією

Benchmark цілі:

  • IOCP: >100,000 requests/sec, <100MB RAM
  • Blocking: <10,000 requests/sec, >1GB RAM

Корисні Посилання

Документація:

Статті:

Книги:

  • "Concurrent Programming on Windows" (Joe Duffy) — Chapter 7: Thread Pools
  • "Windows Internals" (Russinovich) — Chapter 8: I/O System