System Programming Windows

Синхронізація — Mutex, Semaphore та Event-Based Primitives

Kernel-level та advanced синхронізаційні примітиви .NET — Named Mutex для cross-process ізоляції, SemaphoreSlim для обмеження паралелізму, AutoResetEvent та ManualResetEvent для сигналізації між потоками. Теорія, аналогії та детальний розбір кожного примітива.

Синхронізація: Kernel-Level та Advanced Primitives

Ієрархія Синхронізаційних Примітивів

Тема 06 охопила lock та Monitor — основу взаємного виключення всередині одного процесу. Але реальний світ складніший: іноді потрібно обмежити кількість паралельних операцій (не до одного, а до N), іноді — синхронізувати потоки з різних процесів, іноді — реалізувати pattern "один сигналізує — всі прокидаються". Для кожного з цих сценаріїв .NET надає спеціалізований інструмент.

Перш ніж розглядати кожен примітив, важливо зрозуміти принципову різницю між User-mode та Kernel-mode синхронізацією:

User-mode примітиви (lock, Monitor, SpinLock, SemaphoreSlim, ManualResetEventSlim) — реалізовані повністю в просторі користувача. При відсутності конкуренції — це просто операції над пам'яттю (атомарні Interlocked + spinwait). При наявності конкуренції — переходять у ядро через WaitHandle. Дуже швидкі при відсутності колізій (~10-30ns).

Kernel-mode примітиви (Mutex, Semaphore (base), EventWaitHandle) — кожне захоплення і звільнення = системний виклик в ядро ОС. ~200-1000ns overhead навіть без конкуренції. Але: видимі всій системі, підтримують cross-process сценарії, мають власника (Thread Ownership для Mutex).


Mutex: Cross-Process Синхронізація

Що Таке Mutex і Коли Він Потрібен

Теорія та Аналогія: Уявіть, що Mutex — це ключ від спільної кімнати (наприклад, переговорки) у великому офісному центрі. Звичайний lock діє лише в межах вашого кабінету (одного процесу). Але якщо ви хочете синхронізувати доступ між людьми з різних компаній (різних процесів на рівні ОС), вам потрібен Mutex.

Якщо хтось взяв ключ, він стає його власником. Ніхто інший не може повернути цей ключ охоронцю, крім того, хто його взяв. Якщо людина з ключем зникла (процес "впав"), охоронець (ОС) розуміє це і видає спеціальне попередження наступному в черзі, що кімната могла лишитись у "безладі".

System.Threading.Mutexkernel-level примітив взаємного виключення, який працює через системні виклики ОС. На відміну від lock/Monitor, Mutex:

  • Є іменованим — видимий на рівні ОС, що дозволяє синхронізацію між різними процесами.
  • Має Thread Ownership — тільки той потік (і процес), який захопив Mutex, має право його звільнити. Якщо інший потік спробує викликати ReleaseMutex(), виникне помилка.
  • Кидає AbandonedMutexException — механізм безпеки. Якщо процес "впав" (crashed) або потік був примусово завершений, не звільнивши Mutex, ОС автоматично звільняє його і кидає цей exception наступному потоку у черзі (в WaitOne()) як попередження про екстрену ситуацію.

Сценарії застосування:

  1. Single-Instance Application: Гарантія, що на комп'ютері користувача запущено лише один екземпляр вашого застосунку (наприклад, медіаплеєр або інсталятор).
  2. Синхронізація доступу до системних ресурсів: Наприклад, коли кілька різних скриптів/процесів на одній машині намагаються одночасно писати у спільний hardware порт або використовувати спільний memory-mapped файл.

Локальний та Іменований Mutex

MutexTypes.cs
using System.Threading;

// ─── Локальний Mutex (in-process, без імені) ───────────────
// Аналог lock — але з Thread Ownership та AbandonedMutex detection
using var localMutex = new Mutex(
    initiallyOwned: false  // не захоплювати одразу
);

localMutex.WaitOne();  // захоплення (блокує якщо зайнятий)
try
{
    // критична секція
}
finally
{
    localMutex.ReleaseMutex();  // ОБОВ'ЯЗКОВО: тільки власник може звільнити
}

// ─── Іменований Mutex (Named Mutex — System-wide) ─────────
// Ім'я: "Global\MyApp" — видимий всій машині (Global namespace)
// Ім'я: "Local\MyApp" — тільки поточна user session
const string MutexName = "Global\\MyApp_SingleInstance_v2";

using var namedMutex = new Mutex(
    initiallyOwned: false,
    name: MutexName,
    out bool createdNew  // true якщо mutex НЕ існував до нас
);

if (!createdNew)
{
    Console.WriteLine("Вже запущено інший екземпляр!");
    Environment.Exit(1);
}

Console.WriteLine("Ми перші! Продовжуємо роботу.");
// ... головний код застосунку ...
// Mutex автоматично звільняється при dispose або завершенні процесу
Демонстрація Named Mutex
$ dotnet run # Перший екземпляр
Ми перші! Продовжуємо роботу.
Застосунок працює...
$ dotnet run # Спроба запустити другий екземпляр
Вже запущено інший екземпляр!
Process exited with code 1

Single-Instance Application — Повний Патерн

SingleInstanceApp.cs
using System;
using System.Threading;

static class SingleInstanceGuard
{
    private static Mutex? _mutex;
    private const string MutexId = "Global\\{F5E7D9B2-1234-5678-ABCD-MyApplicationId}";
    // GUID у імені: мінімізує collision з іншими програмами

    /// <summary>
    /// Перевірити чи вже є запущений екземпляр.
    /// Повертає true якщо ми ПЕРШІ → можна продовжити.
    /// </summary>
    public static bool TryAcquire()
    {
        bool createdNew;

        try
        {
            _mutex = new Mutex(initiallyOwned: true, name: MutexId, out createdNew);
        }
        catch (UnauthorizedAccessException)
        {
            // Mutex існує але у іншому security context (наприклад, запущений як Admin)
            Console.Error.WriteLine("Не вдається перевірити single-instance (UnauthorizedAccess)");
            return false;
        }

        if (!createdNew)
        {
            // Mutex вже існує → є інший процес
            _mutex.Dispose();
            _mutex = null;
            return false;
        }

        return true;  // ми – перший екземпляр
    }

    public static void Release()
    {
        if (_mutex is not null)
        {
            try { _mutex.ReleaseMutex(); }
            catch (ApplicationException) { }  // якщо вже не власник
            _mutex.Dispose();
            _mutex = null;
        }
    }
}

// Використання у Program.cs:
if (!SingleInstanceGuard.TryAcquire())
{
    Console.WriteLine("Застосунок вже запущено. Завершення.");
    // WPF/WinForms: активувати вікно існуючого екземпляра через Win32 API
    return 1;
}

try
{
    RunApplication();  // основна логіка
}
finally
{
    SingleInstanceGuard.Release();
}
Практичне застосування Named Mutex:
  • Desktop застосунки (WPF, WinForms) — запобігання множинним екземплярам
  • Background сервіси — гарантія що працює тільки одна копія
  • Інсталятори — блокування повторного запуску під час встановлення
  • Cross-process координація — синхронізація між різними процесами

AbandonedMutexException: Захист від Краш-Завершення

AbandonedMutex.cs
using System;
using System.Threading;

var mutex = new Mutex(false, "TestMutex");

// Симулюємо потік що захоплює mutex і "крашиться"
var crashingThread = new Thread(() =>
{
    mutex.WaitOne();
    Console.WriteLine("Потік захопив mutex і завис/крашнувся!");
    // ← НЕ викликає ReleaseMutex() → mutex "покинутий"
});
crashingThread.Start();
crashingThread.Join();

// ОС позначила mutex як "abandoned" після завершення потоку без ReleaseMutex
Console.WriteLine("Намагаємось захопити abandoned mutex...");

try
{
    mutex.WaitOne();
    // НЕ виконається без catch — кидає AbandonedMutexException
    Console.WriteLine("Захопили mutex (після catch)");
}
catch (AbandonedMutexException ex)
{
    // КРИТИЧНО: AbandonedMutexException означає що критична секція попереднього
    // власника завершилась некоректно — дані можуть бути в неузгодженому стані!
    Console.WriteLine($"Mutex був покинутий: {ex.Message}");
    Console.WriteLine("Увага: стан захищеного ресурсу може бути некоректним!");
    // Mutex все рівно захоплено після catch — можна виконати cleanup
    mutex.ReleaseMutex();
}
AbandonedMutexException — не просто "технічна помилка". Вона означає що попередній власник завершив роботу, не виконавши cleanup у критичній секції. Дані, захищені цим Mutex, МОЖУТЬ бути у некоректному стані. Логуйте такі ситуації і виконуйте перевірку/відновлення стану.

SemaphoreSlim: Обмеження Паралельних Операцій

Концепція та Аналогії

Теорія та Аналогія: Якщо lock або Mutex — це "кімната для одного", то Semaphore — це вишибала на вході у нічний клуб, де діє суворе обмеження: не більше N людей всередині. Або ж — парковка на визначену кількість місць.

Семафор має лічильник вільних місць (наприклад, 3 слоти). Коли потік викликає Wait() ("заїжджає на парковку"), лічильник зменшується. Якщо лічильник стає рівним 0 (всі слоти зайняті), наступний потік, що викликає Wait(), блокується і чекає перед "шлагбаумом". Коли один з потоків завершує роботу і викликає Release() ("виїжджає"), лічильник збільшується, і семафор пускає рівно одного потоку з тих, що чекають у черзі.

Важливий нюанс: Семафор не має Thread Ownership, на відміну від Mutex. Один потік може викликати Wait(), а зовсім інший потік (наприклад, колбек) — викликати Release(). Це робить його ідеальним для асинхронних операцій з async/await.

Сценарії використання:

  1. Обмеження навантаження (Throttling / Контроль Конкурентності): Наприклад, ви хочете завантажити 1000 зображень з мережі, але не більше 5 одночасно, щоб не лягла мережа чи не заблокувало API.
  2. Пул ресурсів (Connection Pattern / Object Pool): Бази даних мають ліміт підключень (Connection Pool). Семафор гарантує, що застосунок не спробує відкрити 150 з'єднань, коли сервер тримає лише 100. Зайві запити спокійно зачекають на звільнення у черзі.

SemaphoreSlim — сучасний user-mode варіант (швидший, пам'ятає про CancellationToken), рекомендований для багатопоточності всередині одного процесу. Semaphore (базовий клас) — kernel object з підтримкою синхронізації між процесами (передається як system-wide object).

Основний API

SemaphoreSlimBasics.cs
using System.Threading;
using System.Threading.Tasks;

// SemaphoreSlim(initialCount: скільки слотів доступно зараз,
//               maxCount:     максимальна кількість слотів)
var semaphore = new SemaphoreSlim(initialCount: 3, maxCount: 3);

// Wait() — "займаємо слот" (зменшуємо лічильник)
// Якщо лічильник = 0 → ЖОРСТКО блокуємо поточний потік ОС (Thread)
// УВАГА: Ніколи не викликайте блокуючий Wait() без timeout у головному потоці UI (WPF/WinForms),
// інакше весь інтерфейс програми "зависне" намертво ("Deadlock" ефект).
semaphore.Wait();                            // нескінченне очікування
bool entered = semaphore.Wait(1000);         // безпечно: чекаємо максимум 1 сек
semaphore.Wait(TimeSpan.FromSeconds(5));     // через TimeSpan

// Release() — "звільняємо слот" (збільшуємо лічильник)
// ЗАВЖДИ має викликатися у блоці finally!
semaphore.Release();                 // звільнити 1 слот
semaphore.Release(releaseCount: 2);  // звільнити 2 слоти одразу (якщо ваш потік їх займав)

// Поточний стан
int available = semaphore.CurrentCount;  // скільки слотів вільних зараз

Connection Pool: Класичний Застосунок

ConnectionPool.cs
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Спрощений Connection Pool через SemaphoreSlim.
/// Обмежує кількість одночасних з'єднань — типовий use case.
/// </summary>
public class ConnectionPool<TConnection> where TConnection : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    private readonly ConcurrentQueue<TConnection> _connections = new();
    private readonly Func<TConnection> _factory;

    public ConnectionPool(Func<TConnection> connectionFactory, int maxConnections)
    {
        _factory = connectionFactory;
        _semaphore = new SemaphoreSlim(maxConnections, maxConnections);
    }

    /// <summary>
    /// Отримати з'єднання з пулу. Блокує поточний потік ОС, якщо вільних слотів немає.
    /// </summary>
    public PooledConnection<TConnection> Acquire(int timeoutMs = -1)
    {
        // Синхронне блокування потоку. Якщо timeout = -1, чекає вічно.
        if (!_semaphore.Wait(timeoutMs))
        {
            throw new TimeoutException("Не вдалося отримати з'єднання з пулу (timeout).");
        }

        TConnection connection;
        if (!_connections.TryDequeue(out connection!))
        {
            // Пул не має готових з'єднань → створюємо нове синхронно
            connection = _factory();
        }

        return new PooledConnection<TConnection>(connection, Return);
    }

    private void Return(TConnection connection)
    {
        _connections.Enqueue(connection);  // повертаємо у пул
        _semaphore.Release();              // звільняємо слот для наступного потоку
    }

    public int Available => _semaphore.CurrentCount;
}

/// <summary>
/// RAII wrapper: при виході з блоку using автоматично повертає з'єднання у пул.
/// </summary>
public sealed class PooledConnection<T> : IDisposable where T : IDisposable
{
    public T Connection { get; }
    private readonly Action<T> _returnAction;
    private bool _returned;

    public PooledConnection(T connection, Action<T> returnAction)
    {
        Connection = connection;
        _returnAction = returnAction;
    }

    public void Dispose()
    {
        if (!_returned)
        {
            _returnAction(Connection);
            _returned = true;
        }
    }
}

Rate Limiter: Ще Один Use Case

RateLimiter.cs
/// <summary>
/// Simple rate limiter: максимум N запитів одночасно.
/// Для token-bucket або sliding window — використовуйте Microsoft.AspNetCore.RateLimiting.
/// </summary>
public class ConcurrencyLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private long _totalAcquired = 0;
    private long _totalRejected = 0;

    public ConcurrencyLimiter(int maxConcurrent)
    {
        _semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent);
    }

    /// <summary>Виконати дію якщо є вільний слот, інакше відхилити.</summary>
    public bool TryExecute(Action action)
    {
        // Не чекаємо (timeout 0) — якщо слотів немає, ідемо геть одразу
        bool acquired = _semaphore.Wait(0);
        if (!acquired)
        {
            Interlocked.Increment(ref _totalRejected);
            return false;
        }

        Interlocked.Increment(ref _totalAcquired);
        try
        {
            action();
            return true;
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public long TotalAcquired => Interlocked.Read(ref _totalAcquired);
    public long TotalRejected => Interlocked.Read(ref _totalRejected);
}

AutoResetEvent та ManualResetEventSlim

Концептуальна Різниця та Аналогії

Event-based примітиви кардинально відрізняються від lock чи Semaphore. Вони не захищають блоки коду чи ресурси (не є lock'ами), а реалізують сигналізацію між потоками: "дочекайся, поки щось відбудеться". Це засіб зв'язку: один потік повідомляє Set() ("подія сталась!"), а інші стоять у Wait() ("чекаємо на подію").

Ключова відмінність між ними полягає у тому, скільки потоків "пройдуть" після одного виклику сигналу:

AutoResetEventАналогія: "Метро-турнікет" Хтось оплачує проїзд (викликає Set()). Турнікет розблоковується і пропускає рівно одну людину з черги (один WaitOne() успішно проходить). Одразу після цього турнікет автоматично блокується знову (стан повертається у false). Наступний потік знову чекатиме на нову "оплату".

  • Сценарії використання: Патерн Producer-Consumer, де одне завдання (пакунок даних) може бути оброблене лише одним конкретним обробником-потоком, після чого потік має знову заснути. Або для ексклюзивного пробудження фонового воркера.

ManualResetEventSlimАналогія: "Ворота на стадіон" або "Шлагбаум" Ви відкриваєте ворота (Set()). Усі "вболівальники", що стояли у черзі, і всі, хто ще підбігають, разом проходять через них без затримок. Ворота залишаються відкритими постійно (стан true), доки охоронець вручну їх не закриє, викликавши Reset().

  • Сценарії використання: Розсилка повідомлень (Broadcast) кільком потокам одразу. Наприклад: сигнал координатора "система повністю ініціалізована, всі можете починати", сигнал "глобальний kill switch: додаток зупиняється" для фонових процесів, або коли стартовий постріл дає дозвіл усім потокам-гонщикам стартувати одночасно.
ResetEventsBasics.cs
using System.Threading;

// ─── AutoResetEvent ────────────────────────────────────────────
// initialState: true = сигнал вже встановлений (перший WaitOne пройде одразу)
using var autoEvent = new AutoResetEvent(initialState: false);

// Producer:
autoEvent.Set();   // "видаємо квиток" — один потік пройде
autoEvent.Set();   // ще один квиток

// Consumer:
autoEvent.WaitOne();         // бере квиток, state → false
bool got = autoEvent.WaitOne(TimeSpan.FromSeconds(2));  // з timeout
// got = true: сигнал отриманий у межах timeout
// got = false: timeout → ніхто не видав сигнал

// ─── ManualResetEventSlim ──────────────────────────────────────
// Slim: user-mode, швидше, підтримує CancellationToken, async-ready
using var manualEvent = new ManualResetEventSlim(initialState: false);

// Thread A (signal):
manualEvent.Set();   // відкриваємо ворота → ВСІ що чекали або будуть чекати — проходять

// Thread B, C, D (all waiting simultaneously):
manualEvent.Wait();                             // чекати поки Set() не буде викликано
manualEvent.Wait(CancellationToken.None);       // з CancellationToken
manualEvent.Wait(millisecondsTimeout: 5000);   // з timeout
bool ready = manualEvent.IsSet;                 // перевірка без блокування

// Закрити ворота знову:
manualEvent.Reset();  // тепер нові Wait() знову блокуватимуться

// Класичний kernel-mode варіант (підтримує Named + cross-process):
using var kernelEvent = new ManualResetEvent(initialState: false);
// WaitHandle.WaitAll / WaitAny — можна чекати кілька одразу
bool any = WaitHandle.WaitAny(new WaitHandle[] { autoEvent, kernelEvent }, 5000) != WaitHandle.WaitTimeout;

Producer-Consumer з AutoResetEvent

ProducerConsumer_ARE.cs
using System;
using System.Collections.Generic;
using System.Threading;

public class DataPipeline<T>
{
    private readonly Queue<T> _queue = new();
    private readonly object _queueLock = new();
    private readonly AutoResetEvent _dataAvailable = new(initialState: false);
    private volatile bool _completed = false;

    public void Produce(T item)
    {
        lock (_queueLock) { _queue.Enqueue(item); }
        _dataAvailable.Set();  // Сигнал: "є нові дані для обробки"
    }

    public void CompleteAdding()
    {
        _completed = true;
        _dataAvailable.Set();  // Розбудити Consumer щоб він завершив
    }

    public IEnumerable<T> Consume(int maxWaitMs = 5000)
    {
        while (true)
        {
            // Фізично блокуємо потік. Якщо ніхто нічого не послав за 5 сек — виходимо
            bool signaled = _dataAvailable.WaitOne(maxWaitMs);
            if (!signaled) break;

            while (true)
            {
                T item;
                lock (_queueLock)
                {
                    if (_queue.Count == 0) break;
                    item = _queue.Dequeue();
                }
                yield return item;
            }

            if (_completed && IsQueueEmpty()) yield break;
        }
    }

    private bool IsQueueEmpty() { lock (_queueLock) { return _queue.Count == 0; } }
}

// Демонстрація — один Producer, один Consumer на чистих потоках:
var pipeline = new DataPipeline<string>();

// Фоновий потік Consumer
var consumerThread = new Thread(() =>
{
    foreach (var item in pipeline.Consume())
        Console.WriteLine($"[Consumer] обробив: {item}");
});
consumerThread.Start();

// Головний потік (Producer) генерує 10 елементів
for (int i = 1; i <= 10; i++)
{
    pipeline.Produce($"item-{i:D2}");
    Thread.Sleep(200);
}
pipeline.CompleteAdding();

consumerThread.Join();  // Чекаємо завершення роботи фонового потоку
Producer-Consumer з AutoResetEvent
$ dotnet run
[Consumer] обробив: item-01
[Consumer] обробив: item-02
[Consumer] обробив: item-03
[Consumer] обробив: item-04
[Consumer] обробив: item-05
[Consumer] обробив: item-06
[Consumer] обробив: item-07
[Consumer] обробив: item-08
[Consumer] обробив: item-09
[Consumer] обробив: item-10
Pipeline завершено

ManualResetEventSlim: Broadcast Pattern

BroadcastPattern.cs
using System.Threading;
using System.Threading.Tasks;

// Сценарій: кілька потоків чекають поки система "готова до роботи"
// Після готовності — всі запускаються одночасно

class SystemStartup
{
    private readonly ManualResetEventSlim _readyGate = new(initialState: false);
    private volatile string? _connectionString;

    public void Initialize()
    {
        Console.WriteLine("Ініціалізуємо систему...");
        Thread.Sleep(2000);  // симуляція довгої роботи (підняття БД, зчитування config)
        _connectionString = "Server=db;Database=mydb";
        Console.WriteLine("Система готова!");

        // Відкриваємо ворота: ВСІ потоки, що висять у _readyGate.Wait(), розблокуються ОДНОЧАСНО
        _readyGate.Set();
    }

    public void HandleRequest(int requestId)
    {
        // Чекаємо поки система не готова. Потік фізично блокується (спить).
        _readyGate.Wait();

        // Як тільки ворота відкрито — всі потоки пішли працювати
        Console.WriteLine($"[Request {requestId}] Виконуємо запит через {_connectionString}");
        Thread.Sleep(50);  // симуляція роботи
    }
}

var startup = new SystemStartup();

// Створюємо 10 потоків-запитів, які стартують ДО ініціалізації
var workers = Enumerable.Range(1, 10).Select(i => new Thread(() =>
{
    startup.HandleRequest(i);
})).ToList();

workers.ForEach(t => t.Start()); // всі потоки стартують і одразу зависають на Gate.Wait()

// Головний потік ініціалізує систему (триває 2 секунди)
startup.Initialize();

// Чекаємо поки всі потоки-запити відпрацюють
workers.ForEach(t => t.Join());
Console.WriteLine("Всі запити виконані");
ManualResetEventSlim — Broadcast Pattern
$ dotnet run
Ініціалізуємо систему...
[10 запитів чекають на _readyGate.Wait()...]
Система готова!
[Request 1] Виконуємо з Server=db;Database=mydb
[Request 3] Виконуємо з Server=db;Database=mydb
[Request 2] Виконуємо з Server=db;Database=mydb
[Request 5] Виконуємо з Server=db;Database=mydb
[Request 4] Виконуємо з Server=db;Database=mydb
... всі 10 запитів виконуються паралельно ...
Всі запити виконані

CountdownEvent: Чекати Завершення N Операцій

Концепція, Аналогії та Сценарії

Теорія та Аналогія: Уявіть велику сім'ю, яка пакує речі у відпустку. Водій заводить машину, сідає за кермо і каже: "Ми не поїдемо, доки всі 4 пасажири не складуть свої валізи". Кількість очікуваних подій відома заздалегідь (initialCount = 4). Водій блокується і просто чекає (Wait()). Тим часом, кожен пасажир незалежно від інших пакує речі, і коли закінчує, кричить: "Я готовий!" (викликає Signal(), що зменшує лічильник). Водій рушить з місця виключно тоді, коли пролунають усі 4 "сигнали" і внутрішній лічильник впаде до нуля.

CountdownEvent — ідеальний примітив для сценарію "почекати поки N незалежних задач завершаться". За логікою це зворотно пропорційно до Barrier та Semaphore:

  • У Semaphore один координатор обмежує максимальну кількість активних виконавців до N.
  • У Barrier N активних потоків-учасників чекають один одного.
  • У CountdownEvent один пасивний потік-координатор чекає, поки N виконавців закінчать роботу.

Сценарії використання:

  1. Fork/Join (Розгалуження та Злиття): Поділ великого завдання на незалежні фрагменти, де кожен фрагмент рахується своїм потоком чи Task. Координатор чекає завершення всіх суб-завдань, аби потім злити (агрегувати) всі їхні результати в один.
  2. Паралельна ініціалізація підсистем: Застосунок під час запуску піднімає з'єднання з БД, завантажує конфігурацію, та прогріває кеш у кількох паралельних потоках. Main-потік (startup) не дає застосунку вважатись "готовим", поки всі підсистеми незалежно не відзвітують про готовність через Signal().
CountdownEventBasics.cs
using System.Threading;
using System.Threading.Tasks;

// Ініціалізуємо з початковим значенням = кількість задач
var countdown = new CountdownEvent(initialCount: 5);

// 5 "воркерів" — кожен виконує роботу і сигналізує
var workers = Enumerable.Range(1, 5).Select(i => new Thread(() =>
{
    Console.WriteLine($"[Worker {i}] починає роботу");
    Thread.Sleep(Random.Shared.Next(200, 1000));  // різна тривалість
    Console.WriteLine($"[Worker {i}] завершив");

    countdown.Signal();  // ← декрементує лічильник
    // Коли лічильник досягне 0 → Wait() розблокується
})).ToList();

workers.ForEach(t => t.Start()); // Запуск всіх ОС потоків

// Координатор чекає всіх
Console.WriteLine("[Coordinator] Головний потік блокується, чекає всіх воркерів...");
countdown.Wait();  // фізично блокується поки Count > 0
Console.WriteLine("[Coordinator] Всі завершили! Продовжуємо.");

workers.ForEach(t => t.Join()); // Підчищаємо потоки
CountdownEvent — Координація N Задач
$ dotnet run
[Coordinator] Чекаємо всіх воркерів...
[Worker 1] починає роботу
[Worker 2] починає роботу
[Worker 3] починає роботу
[Worker 4] починає роботу
[Worker 5] починає роботу
[Worker 2] завершив (253ms)
[Worker 4] завершив (412ms)
[Worker 1] завершив (687ms)
[Worker 5] завершив (821ms)
[Worker 3] завершив (945ms)
[Coordinator] Всі завершили! Продовжуємо.
CountdownEventBasics.cs

// ─── Розширені операції ──────────────────────────────────────

// AddCount: збільшити лічильник (якщо потрібно додати задачі динамічно)
var dynamic = new CountdownEvent(1);  // починаємо з 1 (не 0! — щоб не розблокуватись одразу)

for (int i = 0; i < 5; i++)
{
    dynamic.AddCount();  // додаємо задачу

    int taskId = i;
    ThreadPool.QueueUserWorkItem(_ =>
    {
        DoWork(taskId);
        dynamic.Signal();  // завершили
    });
}

dynamic.Signal();  // знімаємо початкову 1, щоб дозволити розблокування
dynamic.Wait();

// TryAddCount: безпечний AddCount (повертає false якщо вже 0)
bool added = dynamic.TryAddCount();

Паралельна Ініціалізація Сервісів

ServiceInit.cs
using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Ініціалізація кількох незалежних сервісів паралельно.
/// Heads up: всі мають завершитись до початку роботи основного коду.
/// </summary>
public class ServiceInitializer
{
    private readonly CountdownEvent _initGate;
    private readonly List<Exception> _errors = new();
    private readonly object _errorLock = new();

    private record ServiceConfig(string Name, Action Initialize);

    private readonly ServiceConfig[] _services;

    public ServiceInitializer()
    {
        _services = [
            new("Database",   () => Thread.Sleep(800)),
            new("Cache",      () => Thread.Sleep(300)),
            new("MessageBus", () => Thread.Sleep(600)),
            new("Config",     () => Thread.Sleep(150)),
        ];

        _initGate = new CountdownEvent(_services.Length);
    }

    public void InitializeAll()
    {
        var sw = System.Diagnostics.Stopwatch.StartNew();

        Console.WriteLine($"Запускаємо ініціалізацію {_services.Length} сервісів паралельно...");

        // Розкидаємо задачі в ThreadPool замість створення важких ОС Thread
        foreach (var svc in _services)
        {
            ThreadPool.QueueUserWorkItem(_ =>
            {
                try
                {
                    Console.WriteLine($"  [{svc.Name}] Ініціалізуємо...");
                    svc.Initialize();  // блокуюча робота
                    Console.WriteLine($"  [{svc.Name}] ✅ Готово");
                }
                catch (Exception ex)
                {
                    lock (_errorLock) { _errors.Add(ex); }
                    Console.Error.WriteLine($"  [{svc.Name}] ❌ {ex.Message}");
                }
                finally
                {
                    _initGate.Signal();  // ОБОВ'ЯЗКОВО сигналізуємо незалежно від результату
                }
            });
        }

        // Чекаємо всіх (головний потік-координатор засинає)
        bool success = _initGate.Wait(TimeSpan.FromSeconds(5)); // Таймаут для безпеки
        sw.Stop();

        if (!success)
        {
            throw new TimeoutException("Один або декілька сервісів зависли і не подали Signal().");
        }

        if (_errors.Count > 0)
        {
            Console.WriteLine($"\n{_errors.Count} сервіси не ініціалізовані за {sw.ElapsedMilliseconds}ms");
            throw new AggregateException("Помилки ініціалізації", _errors);
        }

        Console.WriteLine($"\n✅ Всі сервіси ініціалізовані за {sw.ElapsedMilliseconds}ms");
    }
}
Паралельна Ініціалізація Сервісів
$ dotnet run
Запускаємо ініціалізацію 4 сервісів паралельно...
[Database] Ініціалізуємо...
[Cache] Ініціалізуємо...
[MessageBus] Ініціалізуємо...
[Config] Ініціалізуємо...
[Config] ✅ Готово
[Cache] ✅ Готово
[MessageBus] ✅ Готово
[Database] ✅ Готово
✅ Всі сервіси ініціалізовані за 823ms

Barrier: Фазова Синхронізація

Що Таке Фазова Синхронізація та Аналогії

Теорія та Аналогія: Уявіть групу туристів (група з N потоків), що йде складним, але цікавим гірським маршрутом. Туристи йдуть з різним темпом, однак на маршруті є ключові чекпоінти-оглядові майданчики (Barrier). Працює залізне правило: ніхто не лишає поточний чекпоінт і не йде далі, доки на цей чекпоінт не прийде останній учасник групи (barrier.SignalAndWait()). Коли приходять всі учасники, гід робить загальне фото (спрацьовує callback postPhaseAction) і вся група синхронно рушає до наступного чекпоінту (наступної фази).

Barrier — це примітив для симетричної комунікації. Усі потоки-учасники виконують схожий алгоритм паралельно, але розділений на логічні кроки (фази). Він гарантує, що жоден з потоків не вирветься вперед (у фазу K+1), доки хоча б один його "колега" ще працює над фазою K.

Сценарії використання:

  1. Ітеративні паралельні обчислення: Комп'ютерні симуляції або матричні обчислення (наприклад, "Гра Життя" Конвея), де стан кожної ділянки поля у наступному поколінні безпосередньо залежить від станів усіх ділянок з попереднього покоління. Ви змушені щоразу чекати розрахунку кроку N для всіх елементів перед переходом до N+1.
  2. Parallel Pipeline (Багатофазовий конвеєр): Команда з N фонових потоків спільно парсить лог-файли. Фаза 1 — читання 10 тисяч рядків з диску (усі читають свої шматки). Фаза 2 — фільтрація у пам'яті. Фаза 3 — запис у зовнішню систему. Щоб оптимізувати пам'ять і не допустити ситуації, коли один занадто швидкий потік вижре всю RAM на першій фазі, потоки дозують роботу та синхронізуються між фазами-кроками.
BarrierBasics.cs
using System;
using System.Threading;

// Barrier для 4 учасників з post-phase callback
using var barrier = new Barrier(
    participantCount: 4,
    postPhaseAction: b =>
    {
        // Викликається ОДНИМ потоком після кожної фази, до старту наступної
        // b.CurrentPhaseNumber: номер щойно завершеної фази (0, 1, 2, ...)
        Console.WriteLine($"\n✅ Фаза {b.CurrentPhaseNumber} завершена всіма {b.ParticipantCount} учасниками. → Наступна фаза {b.CurrentPhaseNumber + 1}");
    }
);

var threads = Enumerable.Range(0, 4).Select(i => new Thread(() =>
{
    Console.WriteLine($"[T{i}] Фаза 0: завантаження даних");
    Thread.Sleep(Random.Shared.Next(100, 500));     // різна тривалість

    barrier.SignalAndWait();  // ← T{i} завершив фазу 0, чекає решту
    // ↑ Всі 4 потоки мають викликати SignalAndWait() перш ніж хоча б один продовжить!

    Console.WriteLine($"[T{i}] Фаза 1: обробка даних");
    Thread.Sleep(Random.Shared.Next(100, 400));

    barrier.SignalAndWait();  // ← Синхронізація після фази 1

    Console.WriteLine($"[T{i}] Фаза 2: запис результатів");
    Thread.Sleep(Random.Shared.Next(50, 200));

    barrier.SignalAndWait();  // ← Фінальна синхронізація

    Console.WriteLine($"[T{i}] Завершив весь pipeline");
}) { Name = $"Worker-{i}" }).ToList();

threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
Barrier — Фазова Синхронізація
$ dotnet run
[T0] Фаза 0: завантаження даних
[T1] Фаза 0: завантаження даних
[T2] Фаза 0: завантаження даних
[T3] Фаза 0: завантаження даних
✅ Фаза 0 завершена всіма 4 учасниками. → Наступна фаза 1
[T0] Фаза 1: обробка даних
[T1] Фаза 1: обробка даних
[T2] Фаза 1: обробка даних
[T3] Фаза 1: обробка даних
✅ Фаза 1 завершена всіма 4 учасниками. → Наступна фаза 2
[T0] Фаза 2: запис результатів
[T1] Фаза 2: запис результатів
[T2] Фаза 2: запис результатів
[T3] Фаза 2: запис результатів
✅ Фаза 2 завершена всіма 4 учасниками. → Наступна фаза 3
[T0] Завершив весь pipeline
[T1] Завершив весь pipeline
[T2] Завершив весь pipeline
[T3] Завершив весь pipeline

Динамічне Додавання Учасників

BarrierDynamic.cs
using System.Threading;
using System.Threading.Tasks;

// Бар'єр з можливістю зміни кількості учасників
var barrier = new Barrier(participants: 3);

// Стартові учасники
var workers = Enumerable.Range(0, 3).Select(i => new Thread(() =>
{
    barrier.SignalAndWait();  // Фаза 0
    barrier.SignalAndWait();  // Фаза 1
})).ToList();

workers.ForEach(t => t.Start());

// Пізній учасник: приєднується після старту
Thread.Sleep(50);  // трохи чекаємо

barrier.AddParticipant();  // додати учасника динамічно

var lateWorker = new Thread(() =>
{
    // Учасник може приєднатись до фази 0 якщо вона ще не завершена
    // або одразу до наступної
    barrier.SignalAndWait();
    barrier.SignalAndWait();
    barrier.RemoveParticipant();  // відписуємось після роботи
});
lateWorker.Start();

workers.ForEach(t => t.Join());
lateWorker.Join();

ReaderWriterLockSlim: Оптимізація Для Read-Heavy Доступу

Проблема Яку Вирішують (Теорія та Аналогії)

Аналогія: Уявіть читальну залу бібліотеки з одним унікальним довідником. Звичайний lock змушує відвідувачів заходити в кімнату суворо по одному: прочитав рядок, вийшов, зайшов наступний. Це абсолютно неефективно, оскільки багато людей можуть просто мовчки читати книгу одночасно! Свистати всіх читачів на вихід і вішати "ексклюзивний замок" на двері кімнати слід лише тоді, коли приходить редактор, щоб змінити чи дописати текст олівцем в цю саму книгу (Write Mode). Поки редактор зайнятий, кімната повністю закрита. Коли ж він вийде — двері знову відкриваються навстіж для всіх мовчазних читачів-паралельників.

Класичний lock чи Mutex завжди працюють як "кімната для одного" і не розрізняють читання чи запис. Якщо один потік просто отримує стан змінних, інший потік-читач змушений чекати, створюючи величезне штучне "горлечко пляшки" (bottleneck). Для структур, де читань трапляється в десятки чи сотні разів більше ніж записів, використання звичайного блокування шкодить перформансу.

ReaderWriterLockSlim розв'язує цю проблему, розбиваючи доступ на ролі:

  • Read Lock (EnterReadLock): Безліч потоків-читачів можуть захоплювати його одночасно. Забезпечує високу паралельність. Єдина умова — цей лок не дадуть, якщо зараз хтось записує.
  • Write Lock (EnterWriteLock): Ексклюзивний режим. Дозволений рівно ОДНОМУ потоку, і лиш тоді, коли немає ЖОДНОГО активного читача. Нові читачі і письменники стоятимуть у черзі, чекаючи на звільнення.
  • Upgradeable Read Lock (EnterUpgradeableReadLock): Хитрий перехідний режим для класичного патерну "Отримай, а якщо немає — то Створи і Запиши" (наприклад, метод GetOrAdd у кеші). Цей режим "дихає" разом з іншими читачами, але єдиний з потоків має залізне право ексклюзивно "ескалюватися" до Write Lock без ризику deadlock-ів.

Важливо: Не ліпіть ReaderWriterLockSlim всюди замість lock! Менеджмент лічильників читачів/письменників має власний невеликий overhead. Його використання цілком виправдане тільки для Read-Heavy сценаріїв, наприклад, In-Memory Кеш, Глобальна конфігурація, Dictionaries довідників валют — там, де дані оновлюються, умовно, раз на годину (запис), а зчитуються тисячами запитів на секунду.

RWLockBasics.cs
using System.Threading;

var rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
// LockRecursionPolicy.NoRecursion: безпечніший варіант (кидає при рекурсії)
// LockRecursionPolicy.SupportsRecursion: дозволяє рекурсивне захоплення (повільніше)

// ─── Read Lock: кілька потоків одночасно ────────────────────
rwLock.EnterReadLock();
try
{
    // Читаємо дані — паралельно з іншими читачами ✓
    var value = SharedDictionary["key"];
}
finally
{
    rwLock.ExitReadLock();
}

// ─── Write Lock: виключний доступ ───────────────────────────
rwLock.EnterWriteLock();
try
{
    // Пишемо — нуль читачів/письменників |
    SharedDictionary["key"] = "new_value";
}
finally
{
    rwLock.ExitWriteLock();
}

// ─── Upgradeable Read Lock ────────────────────────────────────
// Читаємо, і якщо потрібно — ескалуємо до Write без звільнення lock
rwLock.EnterUpgradeableReadLock();
try
{
    if (!SharedDictionary.ContainsKey("key"))  // читаємо умову
    {
        rwLock.EnterWriteLock();   // ескалація: якщо потрібно писати
        try
        {
            SharedDictionary["key"] = ComputeValue();
        }
        finally { rwLock.ExitWriteLock(); }
    }
}
finally
{
    rwLock.ExitUpgradeableReadLock();
}

Thread-Safe Cache з ReaderWriterLockSlim

ThreadSafeCache.cs
using System;
using System.Collections.Generic;
using System.Threading;

/// <summary>
/// Thread-safe in-memory кеш з TTL (Time-To-Live).
/// Оптимізований для read-heavy сценарію (багато читань, рідкий запис).
/// </summary>
public class ThreadSafeCache<TKey, TValue> : IDisposable where TKey : notnull
{
    private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion);
    private readonly Dictionary<TKey, CacheEntry> _store = new();
    private readonly TimeSpan _defaultTtl;

    private record CacheEntry(TValue Value, DateTime ExpiresAt);

    public ThreadSafeCache(TimeSpan defaultTtl)
    {
        _defaultTtl = defaultTtl;
    }

    public bool TryGet(TKey key, out TValue? value)
    {
        _lock.EnterReadLock();
        try
        {
            if (_store.TryGetValue(key, out var entry) && entry.ExpiresAt > DateTime.UtcNow)
            {
                value = entry.Value;
                return true;
            }
            value = default;
            return false;
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }

    public void Set(TKey key, TValue value, TimeSpan? ttl = null)
    {
        var expires = DateTime.UtcNow + (ttl ?? _defaultTtl);

        _lock.EnterWriteLock();
        try
        {
            _store[key] = new CacheEntry(value, expires);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    /// <summary>
    /// GetOrAdd: якщо є — повернути, якщо ні — обчислити та зберегти.
    /// Спочатку ReadLock, при потребі — ескалація до WriteLock.
    /// </summary>
    public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory, TimeSpan? ttl = null)
    {
        // Спроба 1: швидке читання
        _lock.EnterReadLock();
        try
        {
            if (_store.TryGetValue(key, out var existing) && existing.ExpiresAt > DateTime.UtcNow)
                return existing.Value;
        }
        finally
        {
            _lock.ExitReadLock();
        }

        // Обчислюємо значення ЗА МЕЖАМИ lock (може бути дорога операція)
        var computed = valueFactory(key);
        var expires = DateTime.UtcNow + (ttl ?? _defaultTtl);

        // Записуємо тільки якщо ще нема (хтось інший міг записати між нашими lock-ами)
        _lock.EnterWriteLock();
        try
        {
            if (!_store.TryGetValue(key, out var race) || race.ExpiresAt <= DateTime.UtcNow)
                _store[key] = new CacheEntry(computed, expires);
            else
                computed = race.Value;  // хтось випередив → використовуємо їхнє значення
        }
        finally
        {
            _lock.ExitWriteLock();
        }

        return computed;
    }

    public void Invalidate(TKey key)
    {
        _lock.EnterWriteLock();
        try { _store.Remove(key); }
        finally { _lock.ExitWriteLock(); }
    }

    public int Count
    {
        get
        {
            _lock.EnterReadLock();
            try { return _store.Count; }
            finally { _lock.ExitReadLock(); }
        }
    }

    public void Dispose() => _lock.Dispose();
}

Підсумок

Mutex

  • Kernel object: cross-process, Named (Global\Name)
  • Thread Ownership: тільки власник може Release
  • AbandonedMutexException — сигнал про crash попереднього власника
  • Single-instance app — найчастіший Use Case

SemaphoreSlim

  • Лічильниковий: N слотів одночасно (N>1, на відміну від lock)
  • Блокує потоки через Wait() (вчить контролювати Deadlock на практиці)
  • Connection Pool, Rate Limiter, API throttling
  • Slim = user-mode (fast), base = kernel (cross-process)

ARE / MRE(Slim)

  • ARE: турнікет — один Set() = один потік проходить
  • MRE: ворота — один Set() = ВСІ чекаючі проходять
  • ManualResetEventSlim: підтримує CancellationToken
  • Broadcast pattern для "система готова до роботи"

CountdownEvent / Barrier

  • CountdownEvent: N→0 через Signal(), Wait() блокується
  • Barrier: група потоків синхронізується між фазами
  • Post-phase action у Barrier: один потік виконує між фазами
  • Parallel data pipeline — головний use case Barrier

Практичні Завдання

Рівень 1: Thread-Safe LRU Cache

Реалізуйте LRUCache<TKey,TValue> на основі ReaderWriterLockSlim:

  1. Конструктор: LRUCache(int capacity) — максимальна кількість елементів
  2. TryGet(TKey k, out TValue v) → true/false + переміщення в кінець LRU черги
  3. Put(TKey k, TValue v) → якщо перевищує capacity — видалити найстаріший
  4. Тест: 4 читачі та 1 письменник паралельно × 10 секунд — без помилок
Підказка: Використовуйте LinkedList<T> для LRU черги + Dictionary<TKey, LinkedListNode<T>> для O(1) lookup. При TryGet — переміщуйте node в кінець списку під EnterUpgradeableReadLock.

Рівень 2: Parallel Pipeline з Barrier

Реалізуйте паралельний pipeline обробки даних:

  1. Генеруйте масив 1000 рядків тексту
  2. Фаза 1 (4 потоки): Parse рядки у структури
  3. Barrier синхронізація з console log яка фаза завершена
  4. Фаза 2 (4 потоки): Filter (залишити тільки довші 30 символів)
  5. Barrier синхронізація
  6. Фаза 3 (1 потік via callback): Sort результатів і Write у файл
Підказка: Використовуйте Barrier.postPhaseAction для логування між фазами. Розділіть масив на 4 частини через Partitioner.Create() або вручну через індекси.

Рівень 3: Worker Coordination

Реалізуйте систему "Task Farm":

  1. CountdownEvent відстежує N в-польоті задач
  2. SemaphoreSlim(maxWorkers) обмежує паралелізм
  3. ManualResetEventSlim — gate що блокує подачу нових задач при throttle
  4. Main thread чекає CountdownEvent.Wait() після подачі всіх задач
  5. Тест: Submit 100 задач з maxWorkers=5 → переконатись що max 5 виконується одночасно
Підказка: Структура: _semaphore.Wait()_countdown.AddCount()new Thread(() => { try { work(); } finally { _countdown.Signal(); _semaphore.Release(); } }).Start(). Використовуйте Interlocked для підрахунку max concurrent.