У попередній темі ми розглянули що таке ThreadPool та як його використовувати. Тепер час зануритись глибше і зрозуміти як він працює всередині. Це знання критично важливе для:
Ця тема — для тих хто хоче по-справжньому розуміти багатопотоковість у .NET, а не просто використовувати готові API.
Аналогія: Черги в супермаркеті
Уявіть величезний супермаркет, де є 8 касирів (наші Worker Threads). Якщо всі 1000 покупців стануть у одну гігантську чергу (Global Queue), почнеться хаос: касири будуть постійно сваритися і блокувати одне одного, щоб покликати наступного, а покупці витрачатимуть час на те, щоб зістикувати потоки. Це і є проблема єдиної глобальної черги — Bottleneck (вузьке горлечко).
Уявіть ThreadPool з 8 worker threads. Якщо всі задачі йдуть у одну спільну чергу — виникає bottleneck:
// Наївна реалізація (НЕ так як у .NET):
class SimpleThreadPool
{
private readonly ConcurrentQueue<Action> _globalQueue = new(); // ← Bottleneck!
public void QueueWorkItem(Action work)
{
_globalQueue.Enqueue(work); // Всі потоки конкурують за цю чергу
}
}
Проблеми:
.NET ThreadPool використовує дворівневу систему черг:
┌─────────────────────────────────────────────────────────────┐
│ Global Queue │
│ (спільна для всіх, використовується рідко) │
│ [Task1] [Task2] [Task3] ... │
└─────────────────────────────────────────────────────────────┘
↓
┌───────────────────┼───────────────────┐
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ Local Queue │ │ Local Queue │ │ Local Queue │
│ [T7][T8][T9] │ │ [T4][T5][T6] │ │ [T10][T11] │
└──────────────┘ └──────────────┘ └──────────────┘
↑ LIFO ↑ LIFO ↑ LIFO
│ (власник) │ (власник) │ (власник)
│ │ │
└─────── FIFO ──────┴─────── FIFO ──────┘
(work stealing між потоками)
Ключові принципи:
Це не випадковість — це оптимізація для cache locality:
// Сценарій: рекурсивна задача
void ProcessDirectory(string path)
{
var files = Directory.GetFiles(path);
foreach (var file in files)
{
ProcessFile(file); // Обробка у поточному потоці
}
var subdirs = Directory.GetDirectories(path);
foreach (var dir in subdirs)
{
// Створюємо нову задачу → йде у LOCAL queue поточного потоку
ThreadPool.QueueUserWorkItem(_ => ProcessDirectory(dir));
}
}
Що відбувається (Людською мовою):
Коли ви щойно розібрали пакунок з документами (батьківська задача), ви дістали звідти 10 окремих папок (підзадачі). Найлогічніше — одразу взяти останню папку, яку ви щойно тримали в руках, і почати з нею працювати, бо ви вже пам'ятаєте контекст (дані знаходяться в "швидкій пам'яті" / кеші CPU). Якби ви взяли найстарішу папку з іншого проєкту, вам довелося б згадувати, що це взагалі таке (Cache Miss — вивантаження нових даних з повільної RAM у мікронний кеш процесора).
ProcessDirectory("C:\\")path prefix) ще в пам'яті → максимальна швидкість доступу.Benchmark: LIFO vs FIFO для власника:
// Рекурсивна задача що створює багато підзадач
void RecursiveWork(int depth)
{
if (depth == 0) return;
// Створюємо 4 підзадачі
for (int i = 0; i < 4; i++)
{
ThreadPool.QueueUserWorkItem(_ => RecursiveWork(depth - 1));
}
}
// LIFO (як у .NET): ~500ms для depth=10
// FIFO (гіпотетично): ~800ms для depth=10
// Різниця: 60% швидше завдяки cache locality!
Коли Worker Thread 2 виконав усю свою роботу, він стає "крадієм" і йде на допомогу до Worker Thread 1. Але крадій бере задачу з початку черги (найстарішу):
Причина 1: Мінімізація конфліктів (Щоб не штовхатись)
Уявіть, що касир активно обслуговує людей, які щойно підійшли (LIFO). Якщо "вільний касир" підійде і почне висмикувати клієнтів з-під носа свого колеги, вони почнуть заважати одне одному (Lock Contention). Замість цього, вільний касир тихенько підходить до "хвоста" черги клієнтів (FIFO) і забирає людей звідти. Вони працюють з різних кінців колекції, тому конфлікту і блокувань практично не виникає.
Причина 2: Більші задачі
Спрощена версія (концептуально схожа на .NET):
using System;
using System.Threading;
/// <summary>
/// Спрощена реалізація Work Stealing Queue.
/// Власник бере з кінця (LIFO), інші крадуть з початку (FIFO).
/// </summary>
public class WorkStealingQueue<T>
{
private T[] _array = new T[32];
private int _mask = 31; // Для швидкого modulo через bitwise AND
private volatile int _headIndex = 0; // Для крадіїв (FIFO)
private volatile int _tailIndex = 0; // Для власника (LIFO)
private readonly object _foreignLock = new(); // Тільки для крадіїв
/// <summary>Додати задачу (викликає тільки власник).</summary>
public void LocalPush(T item)
{
int tail = _tailIndex;
// Перевірка чи потрібно розширити масив
if (tail - _headIndex >= _mask)
{
lock (_foreignLock)
{
GrowArray();
}
}
_array[tail & _mask] = item;
_tailIndex = tail + 1; // Publish нового tail
}
/// <summary>Забрати задачу (викликає тільки власник, LIFO).</summary>
public bool LocalPop(out T? item)
{
int tail = _tailIndex - 1;
Interlocked.Exchange(ref _tailIndex, tail); // Atomic decrement
if (_headIndex <= tail)
{
// Черга не порожня
item = _array[tail & _mask];
return true;
}
else
{
// Черга порожня або конфлікт з крадієм
_tailIndex = tail + 1; // Відкат
item = default;
return false;
}
}
/// <summary>Вкрасти задачу (викликають інші потоки, FIFO).</summary>
public bool TrySteal(out T? item)
{
lock (_foreignLock) // Крадії синхронізуються між собою
{
int head = _headIndex;
int tail = _tailIndex;
if (head < tail)
{
// Є задачі для крадіжки
item = _array[head & _mask];
_headIndex = head + 1; // Atomic increment
return true;
}
item = default;
return false;
}
}
private void GrowArray()
{
int newSize = (_array.Length - 1) * 2 + 1; // Подвоїти розмір
var newArray = new T[newSize];
int head = _headIndex;
int tail = _tailIndex;
for (int i = head; i < tail; i++)
{
newArray[i & (newSize - 1)] = _array[i & _mask];
}
_array = newArray;
_mask = newSize - 1;
}
public int Count => Math.Max(0, _tailIndex - _headIndex);
}
Ключові деталі:
LocalPush/LocalPop — без lock для власника (швидко!)TrySteal — з lock для крадіїв (вони конкурують між собою)_headIndex/_tailIndex (видимість між потоками)У темі 09 ми розглянули концепцію Hill Climbing. Тепер — математика та деталі реалізації.
Аналогія: Менеджер Ресторану (Hill Climbing)
Уявіть менеджера великого ресторану. Його мета — щоб кухня видавала якомога більше страв за годину (Throughput). Якщо кухарів (Потоків) замало, замовлення простоюють. Менеджер наймає ще одного кухаря, бачить, що швидкість видачі страв зросла — супер, наймає ще одного. Але в якийсь момент кухарів стає настільки багато, що вони починають штовхатися біля плит, битися посудом і чекати в черзі до духовки (Context Switching, Lock Contention). Продуктивність раптово падає. Менеджер бачить падіння ефективності, розуміє, що перегнув палицю, і починає звільняти кухарів по одному, доки швидкість знову не стане максимальною. Ця гра невпинного балансування "додав — перевірив — відібрав" і є алгоритмом Hill Climbing (Підйом на пагорб).
Мета: Максимізувати throughput (пропускну здатність) — кількість виконаних задач за одиницю часу.
Виклик: Оптимальна кількість потоків залежить від навантаження (меню постійно змінюється):
Hill Climbing у .NET — це варіант gradient ascent (підйом за градієнтом):
Throughput
↑
│ ╱╲ ← Оптимум (шукаємо цю точку)
│ ╱ ╲
│ ╱ ╲
│ ╱ ╲
│ ╱ ╲
│____╱__________╲____________→ Кількість потоків
↑ ↑
Занадто мало Занадто багато
(CPU idle) (context switch overhead)
Псевдокод алгоритму:
class HillClimbingAlgorithm
{
private int _currentThreadCount;
private double _currentThroughput;
private int _direction = +1; // +1 = додавати, -1 = забирати
private double _stepSize = 1.0;
public void Adjust()
{
// 1. Вимірюємо поточний throughput
double newThroughput = MeasureThroughput();
// 2. Обчислюємо зміну (gradient)
double delta = newThroughput - _currentThroughput;
if (delta > 0)
{
// Throughput зріс → продовжуємо у тому ж напрямку
_currentThreadCount += (int)(_direction * _stepSize);
_stepSize *= 1.05; // Збільшуємо крок (прискорюємось)
}
else if (delta < 0)
{
// Throughput впав → змінюємо напрямок
_direction = -_direction;
_stepSize = Math.Max(1.0, _stepSize * 0.9); // Зменшуємо крок
_currentThreadCount += (int)(_direction * _stepSize);
}
else
{
// Throughput не змінився → пробуємо інший напрямок
_direction = -_direction;
_currentThreadCount += _direction;
}
// 3. Обмеження
_currentThreadCount = Math.Clamp(_currentThreadCount, MinThreads, MaxThreads);
// 4. Застосовуємо нову кількість
SetWorkerThreadCount(_currentThreadCount);
// 5. Зберігаємо для наступної ітерації
_currentThroughput = newThroughput;
}
private double MeasureThroughput()
{
// Кількість виконаних задач за останні 500ms
return CompletedWorkItemCount / 0.5; // tasks/second
}
}
Реальний throughput "шумний" — він стрибає через:
CLR використовує exponential smoothing для згладжування:
double _smoothedThroughput = 0;
const double Alpha = 0.3; // Вага нового вимірювання (0-1)
void UpdateThroughput(double measured)
{
// Exponential Moving Average (EMA)
_smoothedThroughput = Alpha * measured + (1 - Alpha) * _smoothedThroughput;
// Використовуємо згладжене значення для рішень
if (_smoothedThroughput > _previousSmoothed)
{
// Throughput зростає...
}
}
Ефект: Алгоритм не реагує на короткочасні стрибки, але швидко адаптується до справжніх змін навантаження.
Якщо ви моніторите ThreadPool.ThreadCount у production, побачите хвилеподібну поведінку:
Threads
↑
12│ ╱╲ ╱╲ ╱╲
10│ ╱ ╲ ╱ ╲ ╱ ╲
8│ ╱ ╲ ╱ ╲ ╱ ╲
6│ ╱ ╲╱ ╲╱ ╲
4│╱________________________╲___→ Time
Чому так відбувається? (Танець навколо оптимуму)
Щоб з впевненістю дізнатися, що ви на самій вершині гори, вам доведеться зробити хоча б один крок донизу.
Це абсолютно нормальна і здорова поведінка. Алгоритм не може магічно зупинитися ідеально в центрі через саму свою природу (і мінливість систем навантаження).
Сценарій 1: Раптовий сплеск навантаження
// Раптово додаємо 10000 задач
for (int i = 0; i < 10000; i++)
{
ThreadPool.QueueUserWorkItem(_ => DoWork());
}
// Hill Climbing додає потоки ПОВІЛЬНО (по 1-2 кожні 500ms)
// Перші 5000 задач чекатимуть у черзі!
Рішення: Збільшити MinThreads якщо знаєте що завжди потрібно багато потоків:
ThreadPool.SetMinThreads(workerThreads: 50, completionPortThreads: 50);
// Тепер ThreadPool одразу має 50 потоків готових до роботи
Сценарій 2: Блокуючі операції
// ❌ ПОГАНО: блокуємо worker threads
for (int i = 0; i < 100; i++)
{
ThreadPool.QueueUserWorkItem(_ =>
{
Thread.Sleep(10_000); // Блокування на 10 секунд
});
}
// Hill Climbing бачить що throughput = 0 (нічого не завершується)
// Він додає потоки, але вони теж блокуються
// Результат: thread starvation
Рішення: Ніколи не блокуйте worker threads — використовуйте async/await.
Тепер створимо власний ThreadPool з нуля щоб зрозуміти всі деталі:
using System;
using System.Collections.Concurrent;
using System.Threading;
/// <summary>
/// Спрощена реалізація ThreadPool для навчання.
/// Підтримує: динамічну кількість потоків, graceful shutdown, статистику.
/// </summary>
public class CustomThreadPool : IDisposable
{
private readonly ConcurrentQueue<Action> _workQueue = new();
private readonly List<Thread> _workers = new();
private readonly ManualResetEventSlim _workAvailable = new(false);
private readonly int _minThreads;
private readonly int _maxThreads;
private int _currentThreads = 0;
private long _completedWorkItems = 0;
private long _totalWorkItems = 0;
private volatile bool _shutdown = false;
public CustomThreadPool(int minThreads = 4, int maxThreads = 100)
{
_minThreads = minThreads;
_maxThreads = maxThreads;
// Створюємо мінімальну кількість потоків одразу
for (int i = 0; i < _minThreads; i++)
{
CreateWorkerThread();
}
}
/// <summary>Додати задачу у чергу.</summary>
public void QueueUserWorkItem(Action work)
{
if (_shutdown)
throw new InvalidOperationException("ThreadPool is shutting down");
Interlocked.Increment(ref _totalWorkItems);
_workQueue.Enqueue(work);
_workAvailable.Set(); // Сигнал: є робота
// Простий injection logic: якщо черга довга і є місце → додати потік
if (_workQueue.Count > _currentThreads * 2 && _currentThreads < _maxThreads)
{
lock (_workers)
{
if (_currentThreads < _maxThreads)
{
CreateWorkerThread();
}
}
}
}
private void CreateWorkerThread()
{
var worker = new Thread(WorkerLoop)
{
IsBackground = true,
Name = $"CustomThreadPool-Worker-{_currentThreads}"
};
_workers.Add(worker);
Interlocked.Increment(ref _currentThreads);
worker.Start();
}
private void WorkerLoop()
{
DateTime lastWorkTime = DateTime.UtcNow;
while (!_shutdown)
{
if (_workQueue.TryDequeue(out var work))
{
try
{
work();
Interlocked.Increment(ref _completedWorkItems);
lastWorkTime = DateTime.UtcNow;
}
catch (Exception ex)
{
Console.Error.WriteLine($"Worker exception: {ex.Message}");
}
}
else
{
// Немає роботи
// Thread retirement: якщо не було роботи 30 секунд і потоків більше ніж min
if (_currentThreads > _minThreads &&
(DateTime.UtcNow - lastWorkTime).TotalSeconds > 30)
{
// Завершуємо цей потік
Interlocked.Decrement(ref _currentThreads);
return;
}
// Чекаємо на сигнал (з timeout для перевірки shutdown)
_workAvailable.Wait(TimeSpan.FromSeconds(1));
_workAvailable.Reset();
}
}
}
/// <summary>Graceful shutdown: чекаємо завершення всіх задач.</summary>
public void Shutdown(TimeSpan timeout)
{
_shutdown = true;
_workAvailable.Set(); // Розбудити всіх
var deadline = DateTime.UtcNow + timeout;
foreach (var worker in _workers)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining > TimeSpan.Zero)
{
worker.Join(remaining);
}
}
}
public void Dispose()
{
Shutdown(TimeSpan.FromSeconds(10));
_workAvailable.Dispose();
}
// Статистика
public int CurrentThreads => _currentThreads;
public int QueueLength => _workQueue.Count;
public long CompletedWorkItems => Interlocked.Read(ref _completedWorkItems);
public long TotalWorkItems => Interlocked.Read(ref _totalWorkItems);
public double CompletionRate => TotalWorkItems > 0
? CompletedWorkItems / (double)TotalWorkItems
: 0;
}
using System;
using System.Diagnostics;
using System.Threading;
var pool = new CustomThreadPool(minThreads: 2, maxThreads: 10);
Console.WriteLine("=== Custom ThreadPool Demo ===\n");
// Моніторинг у фоні
var monitor = new Thread(() =>
{
while (true)
{
Console.WriteLine($"[Monitor] Threads: {pool.CurrentThreads}, " +
$"Queue: {pool.QueueLength}, " +
$"Completed: {pool.CompletedWorkItems}/{pool.TotalWorkItems} " +
$"({pool.CompletionRate:P0})");
Thread.Sleep(500);
}
}) { IsBackground = true };
monitor.Start();
// Тест 1: Раптовий сплеск навантаження
Console.WriteLine("Test 1: Burst of 1000 tasks...");
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1000; i++)
{
int taskId = i;
pool.QueueUserWorkItem(() =>
{
Thread.Sleep(Random.Shared.Next(10, 50)); // Симуляція роботи
});
}
// Чекаємо завершення
while (pool.CompletedWorkItems < 1000)
{
Thread.Sleep(100);
}
sw.Stop();
Console.WriteLine($"Completed in {sw.ElapsedMilliseconds}ms\n");
// Тест 2: Тривале навантаження
Console.WriteLine("Test 2: Sustained load for 10 seconds...");
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
int submitted = 0;
while (!cts.Token.IsCancellationRequested)
{
pool.QueueUserWorkItem(() =>
{
Thread.Sleep(Random.Shared.Next(5, 20));
});
submitted++;
Thread.Sleep(10);
}
Console.WriteLine($"Submitted {submitted} tasks during sustained load\n");
// Cleanup
Thread.Sleep(2000); // Дати час завершити
pool.Dispose();
Console.WriteLine("=== Demo Complete ===");
I/O Completion Port (IOCP) — геніальний механізм Windows kernel (ядра ОС) для надвисокоефективної обробки асинхронних I/O (Input/Output) операцій (читання файлів, робота з мережею). Це не .NET концепція — це нативна магія Windows, яку .NET просто обгортає у зручний інтерфейс.
Аналогія: Замовлення Піци у 10,000 клієнтів
Без IOCP (Синхронно): Ви — оператор піцерії (Thread). Ви прийняли велике замовлення, передали статус на кухню і... стали прямо біля дверей видачі, нічого не роблячи (Потік заблоковано). Щоб обслужити 10,000 людей одночасно, вам знадобиться 10,000 операторів, які просто стоятимуть і тупитимуть. Це катастрофічна витрата оперативної пам'яті (RAM).
З IOCP (Асинхронно): Ви приймаєте замовлення, передаєте на кухню (запит до інтерфейсу мережі/дискового контролера), залишаєте записку з номером (Callback / Task) і одразу йдете обслуговувати іншого клієнта (Потік повертається в Пул унікально вільним). Коли піцу спечено, кухар (залізо/DMA-контролер) сам кладе її на спеціальний стіл видачі (I/O Completion Port). Будь-який з 20 вільних операторів (IOCP Threads), який проходить повз, візьме готову піцу з цього столу і понесе клієнту. Так 10-20 працівників спокійно та блискавично можуть обслужити 10,000 замовлень!
Проблема без IOCP (У коді):
// Наївний підхід: один потік на з'єднання
void HandleClient(Socket client)
{
var thread = new Thread(() =>
{
while (true)
{
byte[] buffer = new byte[1024];
int received = client.Receive(buffer); // ← БЛОКУЄ потік!
if (received == 0) break;
ProcessData(buffer, received);
}
});
thread.Start();
}
// Проблема: 10,000 клієнтів = 10,000 потоків = катастрофа!
**Рішення з IO CP**:
// З IOCP: один потік обслуговує тисячі з'єднань
async Task HandleClientAsync(Socket client)
{
while (true)
{
byte[] buffer = new byte[1024];
int received = await client.ReceiveAsync(buffer); // ← НЕ блокує потік!
if (received == 0) break;
ProcessData(buffer, received);
}
}
// 10,000 клієнтів = ~10-20 потоків (IOCP threads) = ефективно!
Архітектура:
┌─────────────────────────────────────────────────────────┐
│ Application │
│ socket.ReceiveAsync() → повертає Task одразу │
└────────────────────┬────────────────────────────────────┘
│ (не блокує потік)
↓
┌─────────────────────────────────────────────────────────┐
│ Windows Kernel (IOCP) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ I/O Completion Queue │ │
│ │ [Completed I/O 1] [Completed I/O 2] ... │ │
│ └──────────────────────────────────────────────────┘ │
│ ↑ │
│ Network Card ──────┘ (DMA transfer, no CPU) │
└─────────────────────────────────────────────────────────┘
│
↓ (GetQueuedCompletionStatus)
┌─────────────────────────────────────────────────────────┐
│ ThreadPool IOCP Threads │
│ Thread 1: чекає на IOCP → отримує completion → викликає callback
│ Thread 2: чекає на IOCP → отримує completion → викликає callback
└─────────────────────────────────────────────────────────┘
Ключові кроки (Як це працює технічно):
socket.ReceiveAsync(). Це наказ ядру Windows (Kernel) розпочати слухати мережу.Task, і ваш Thread йде виконувати іншу роботу. Він не висить в стані Thread.Sleep або блокування.ThreadPool IOCP Threads) постійно моніторить цю чергу через надзвичайно легку функцію GetQueuedCompletionStatus(). Найбільш вільний потік бере результат з черги і продовжує виконання вашого коду (continuation, що стоїть після слова await).Під капотом .NET використовує ці Windows API:
// Створення IOCP (один на весь процес)
HANDLE iocp = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, // створити новий IOCP
NULL,
0,
0 // кількість concurrent threads (0 = auto)
);
// Асоціювання socket з IOCP
CreateIoCompletionPort(
(HANDLE)socket, // handle до socket
iocp, // існуючий IOCP
(ULONG_PTR)socket, // completion key (ідентифікатор)
0
);
// IOCP Thread loop
while (true)
{
DWORD bytesTransferred;
ULONG_PTR completionKey;
OVERLAPPED* overlapped;
// Блокуючий виклик: чекає поки kernel не поверне завершену I/O операцію
BOOL success = GetQueuedCompletionStatus(
iocp,
&bytesTransferred,
&completionKey,
&overlapped,
INFINITE // timeout
);
if (success)
{
// I/O завершилась успішно → викликати .NET callback
InvokeCallback(overlapped, bytesTransferred);
}
}
Аналогія:
Overlapped— це своєрідний "конверт із зворотньою адресою" на рівні ядра ОС. Коли ви відправляєте запит на читання диска, ви передаєте цей конверт ядру. Ядро відкладає його в пам'ять. Коли диск зчитає масив даних, ядро дістає ваш конверт, укладає туди статус операції (Успішно/Помилка, байтів отримано) і за метаданими звідти розуміє, кому саме в .NET треба просигналізувати про завершення.
Overlapped — це C/C++ структура, що зберігає стан асинхронної I/O операції:
typedef struct _OVERLAPPED {
ULONG_PTR Internal; // Статус операції
ULONG_PTR InternalHigh; // Кількість переданих байтів
DWORD Offset; // Offset у файлі (для файлів)
DWORD OffsetHigh;
HANDLE hEvent; // Event для сигналізації (опційно)
} OVERLAPPED;
Коли ви викликаєте socket.ReceiveAsync():
OVERLAPPED структуруWSARecv() з прапорцем overlappedOVERLAPPED* і повертає ERROR_IO_PENDINGOVERLAPPED* у IOCP queueusing System;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
// Сервер що обслуговує багато клієнтів
// ❌ Blocking I/O: один потік на клієнта
class BlockingServer
{
public void Start(int port)
{
var listener = new TcpListener(IPAddress.Any, port);
listener.Start();
while (true)
{
var client = listener.AcceptTcpClient(); // Блокує
// Новий потік для кожного клієнта!
new Thread(() => HandleClient(client)) { IsBackground = true }.Start();
}
}
private void HandleClient(TcpClient client)
{
var stream = client.GetStream();
byte[] buffer = new byte[1024];
while (true)
{
int read = stream.Read(buffer, 0, buffer.Length); // Блокує потік!
if (read == 0) break;
stream.Write(buffer, 0, read); // Echo назад
}
}
}
// ✅ IOCP: async/await
class IOCPServer
{
public async Task StartAsync(int port)
{
var listener = new TcpListener(IPAddress.Any, port);
listener.Start();
while (true)
{
var client = await listener.AcceptTcpClientAsync(); // Async
// Не створюємо потік — просто запускаємо Task
_ = HandleClientAsync(client);
}
}
private async Task HandleClientAsync(TcpClient client)
{
var stream = client.GetStream();
byte[] buffer = new byte[1024];
while (true)
{
int read = await stream.ReadAsync(buffer); // НЕ блокує потік!
if (read == 0) break;
await stream.WriteAsync(buffer.AsMemory(0, read)); // Echo назад
}
}
}
// Результати для 10,000 одночасних клієнтів:
// Blocking: 10,000 потоків, ~10GB RAM, CPU 100% (context switching)
// IOCP: ~20 потоків, ~200MB RAM, CPU 5% (ефективно!)
1. Zero CPU для очікування I/O
2. Scalability
3. DMA (Direct Memory Access)
1. PendingWorkItemCount — кількість задач у черзі:
long pending = ThreadPool.PendingWorkItemCount;
if (pending > 1000)
{
Console.WriteLine("WARNING: ThreadPool queue is growing!");
// Можливі причини:
// - Thread starvation (всі потоки заблоковані)
// - Недостатньо потоків (збільшити MinThreads)
// - Занадто багато задач (throttling)
}
2. AvailableThreads — скільки потоків вільні:
ThreadPool.GetAvailableThreads(out int availableWorker, out int availableIOCP);
ThreadPool.GetMaxThreads(out int maxWorker, out int maxIOCP);
int busyWorker = maxWorker - availableWorker;
if (busyWorker > maxWorker * 0.9)
{
Console.WriteLine("WARNING: 90% of worker threads are busy!");
}
3. CompletedWorkItemCount — lifetime counter:
long completed = ThreadPool.CompletedWorkItemCount;
// Використовуйте для обчислення throughput
Сценарій 1: Раптові сплески навантаження
// Проблема: Hill Climbing повільно реагує
// Рішення: збільшити MinThreads
// Перед (default MinThreads = 8 на 8-core машині):
// - 1000 задач додаються одразу
// - Перші 8 виконуються, решта чекають
// - Hill Climbing додає потоки по 1-2 кожні 500ms
// - Результат: перші 500 задач чекають 10+ секунд
// Після (MinThreads = 50):
ThreadPool.SetMinThreads(workerThreads: 50, completionPortThreads: 50);
// - 1000 задач додаються одразу
// - Перші 50 виконуються одразу
// - Результат: набагато швидше
Сценарій 2: Блокуючі операції (якщо async неможливий)
// Якщо ви ЗМУШЕНІ використовувати blocking API (legacy код):
ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 100);
// Але краще: переписати на async!
Правило: Збільшуйте MinThreads тільки якщо:
Сценарій: Обмеження ресурсів
// Контейнер з обмеженою пам'яттю (512MB)
// Кожен потік = ~1MB стеку
// MaxThreads = 32767 (default) → потенційно 32GB!
ThreadPool.SetMaxThreads(workerThreads: 100, completionPortThreads: 100);
// Тепер максимум 100 потоків = ~100MB стеків
Правило: Зменшуйте MaxThreads тільки якщо:
Збір ThreadPool events:
# Через dotnet-trace
dotnet-trace collect \
--process-id <PID> \
--providers Microsoft-Windows-DotNETRuntime:0x10000:5
# Через PerfView (Windows)
PerfView.exe /nogui collect \
/providers=*Microsoft-Windows-DotNETRuntime:0x10000:5
Ключові події:
ThreadPoolWorkerThreadStart — новий потік створеноThreadPoolWorkerThreadStop — потік завершено (retirement)ThreadPoolWorkerThreadAdjustmentSample — Hill Climbing рішенняThreadPoolWorkerThreadWait — потік чекає роботиThreadPoolEnqueueWorkObject — задача додана у чергуАналіз у PerfView:
.etl файлThreadPoolThreadStart без ThreadStop → leakPendingWorkItemCount → starvationAdjustment → нестабільне навантаженняusing System;
using System.Collections.Generic;
using System.Threading;
public enum Priority { Low, Normal, High }
public class PriorityThreadPool
{
private readonly SortedDictionary<Priority, Queue<Action>> _queues = new()
{
{ Priority.High, new Queue<Action>() },
{ Priority.Normal, new Queue<Action>() },
{ Priority.Low, new Queue<Action>() }
};
private readonly object _lock = new();
private readonly ManualResetEventSlim _workAvailable = new(false);
private volatile bool _shutdown = false;
public PriorityThreadPool(int workerCount = 4)
{
for (int i = 0; i < workerCount; i++)
{
ThreadPool.QueueUserWorkItem(_ => WorkerLoop());
}
}
public void Enqueue(Action work, Priority priority = Priority.Normal)
{
lock (_lock)
{
_queues[priority].Enqueue(work);
_workAvailable.Set();
}
}
private void WorkerLoop()
{
while (!_shutdown)
{
Action? work = null;
lock (_lock)
{
// Шукаємо задачу з найвищим пріоритетом
foreach (var priority in new[] { Priority.High, Priority.Normal, Priority.Low })
{
if (_queues[priority].Count > 0)
{
work = _queues[priority].Dequeue();
break;
}
}
}
if (work != null)
{
work();
}
else
{
_workAvailable.Wait(TimeSpan.FromSeconds(1));
_workAvailable.Reset();
}
}
}
public void Shutdown() => _shutdown = true;
}
using System;
using System.Threading;
public class RateLimitedThreadPool
{
private readonly SemaphoreSlim _semaphore;
private readonly int _maxConcurrent;
public RateLimitedThreadPool(int maxConcurrent)
{
_maxConcurrent = maxConcurrent;
_semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent);
}
public async Task ExecuteAsync(Func<Task> work)
{
await _semaphore.WaitAsync();
try
{
await work();
}
finally
{
_semaphore.Release();
}
}
public int AvailableSlots => _semaphore.CurrentCount;
}
// Використання: обмежити до 5 concurrent HTTP requests
var rateLimiter = new RateLimitedThreadPool(maxConcurrent: 5);
var tasks = Enumerable.Range(0, 100).Select(i =>
rateLimiter.ExecuteAsync(async () =>
{
await httpClient.GetStringAsync($"https://api.example.com/item/{i}");
})
);
await Task.WhenAll(tasks);
// Максимум 5 requests одночасно, решта чекають
using System;
using System.Threading;
public class CircuitBreaker
{
private enum State { Closed, Open, HalfOpen }
private State _state = State.Closed;
private int _failureCount = 0;
private DateTime _lastFailureTime;
private readonly int _failureThreshold;
private readonly TimeSpan _timeout;
private readonly object _lock = new();
public CircuitBreaker(int failureThreshold = 5, TimeSpan? timeout = null)
{
_failureThreshold = failureThreshold;
_timeout = timeout ?? TimeSpan.FromSeconds(60);
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> operation)
{
lock (_lock)
{
if (_state == State.Open)
{
if (DateTime.UtcNow - _lastFailureTime > _timeout)
{
_state = State.HalfOpen;
}
else
{
throw new InvalidOperationException("Circuit breaker is OPEN");
}
}
}
try
{
var result = await operation();
lock (_lock)
{
_failureCount = 0;
_state = State.Closed;
}
return result;
}
catch
{
lock (_lock)
{
_failureCount++;
_lastFailureTime = DateTime.UtcNow;
if (_failureCount >= _failureThreshold)
{
_state = State.Open;
}
}
throw;
}
}
public string CurrentState => _state.ToString();
}
Work Stealing
Hill Climbing
IOCP
Production Tuning
Реалізуйте повноцінну Work Stealing Queue:
Вимоги:
LocalPush(T item) — додати задачу (тільки власник)LocalPop(out T item) — забрати задачу LIFO (тільки власник)TrySteal(out T item) — вкрасти задачу FIFO (інші потоки)Тест:
Розширте Custom ThreadPool додавши спрощений Hill Climbing:
Вимоги:
Тест:
Створіть високопродуктивний echo server через IOCP:
Вимоги:
Тест:
Benchmark цілі:
Документація:
Статті:
Книги:
ThreadPool — Пул Потоків для Ефективного Виконання
Глибокий академічний розбір ThreadPool у .NET — архітектура, Hill Climbing Algorithm, worker threads vs IOCP threads, ExecutionContext, проблеми thread starvation та best practices використання пулу потоків.
Concurrent та Immutable Collections
Thread-safe колекції в .NET — ConcurrentDictionary з striped locking, lock-free ConcurrentQueue/Stack/Bag, BlockingCollection для producer-consumer, Immutable Collections та persistent data structures. Детальний розбір архітектури, benchmarks та практичні сценарії.