У попередніх темах ми вивчили синхронізацію для багатопоточного коду: lock, Monitor, Mutex, Semaphore. Всі ці примітиви розроблені для синхронного коду — вони блокують потік до отримання доступу до ресурсу.
Проте в асинхронному світі блокування потоків — це антипатерн. Розглянемо три типові проблеми:
Проблема перша: lock не працює з await. Спроба використати lock всередині async методу з await призводить до compiler error:
private readonly object _lock = new();
async Task ProcessAsync()
{
lock (_lock) // ✅ OK
{
await Task.Delay(100); // ❌ CS1996: Cannot await in the body of a lock statement
}
}
Чому заборонено? lock — це thread-affine примітив: він прив'язаний до конкретного потоку. Після await виконання може продовжитись на іншому потоці, що порушує семантику lock (один потік захопив lock, інший звільнив — undefined behavior).
Проблема друга: Throttling async операцій. API дозволяє максимум 10 одночасних HTTP запитів. Як обмежити кількість паралельних async операцій без блокування потоків?
Проблема третя: Timeout для async операцій. Як встановити таймаут для await без блокування потоку на чеканні?
Ця тема надає рішення для всіх трьох проблем: SemaphoreSlim.WaitAsync() для async mutual exclusion, throttling patterns, timeout strategies, та огляд бібліотеки Nito.AsyncEx з готовими async примітивами.
Щоб зрозуміти проблему, розглянемо що компілятор генерує з lock:
// Вихідний код
lock (_lock)
{
// Critical section
}
// Компілятор генерує (спрощено)
bool lockTaken = false;
try
{
Monitor.Enter(_lock, ref lockTaken);
// Critical section
}
finally
{
if (lockTaken)
Monitor.Exit(_lock);
}
Ключовий момент: Monitor.Enter() запам'ятовує ID потоку, що захопив lock. Monitor.Exit() перевіряє, що звільнення відбувається на тому ж потоці. Це thread-affinity.
async Task ProcessAsync()
{
lock (_lock)
{
Console.WriteLine($"Before await: Thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(100); // Suspension point
Console.WriteLine($"After await: Thread {Thread.CurrentThread.ManagedThreadId}");
// Потоки можуть бути різними!
}
}
Проблема: Після await continuation може виконатись на іншому потоці ThreadPool. Якщо Thread 5 захопив lock, а Thread 8 намагається звільнити — Monitor.Exit() викине SynchronizationLockException.
await continuation повертається на той самий потік (через SynchronizationContext), але цей потік вже зайнятий іншою роботою — deadlock. Lock тримається, потік чекає, continuation не виконується.Компілятор C# забороняє await всередині lock саме через ці проблеми:
lock (_lock)
{
await Task.Delay(100); // CS1996: Cannot await in the body of a lock statement
}
Це не обмеження технічне — це захист від race conditions та deadlocks.
SemaphoreSlim — це легковаговий semaphore, що підтримує асинхронне очікування через WaitAsync(). На відміну від lock, він не прив'язаний до потоку — можна захопити на одному потоці, звільнити на іншому.
Ключова ідея: SemaphoreSlim(1, 1) — це async-compatible mutex (mutual exclusion для одного потоку/task).
class AsyncCounter
{
private int _count;
private readonly SemaphoreSlim _lock = new(1, 1); // initialCount: 1, maxCount: 1
public async Task<int> IncrementAsync()
{
// Асинхронне очікування доступу (не блокує потік)
await _lock.WaitAsync();
try
{
// Critical section — тільки один task одночасно
await Task.Delay(10); // ✅ await всередині "lock" — OK!
_count++;
return _count;
}
finally
{
// Звільнення lock — ЗАВЖДИ у finally
_lock.Release();
}
}
}
// Використання — 10 паралельних tasks
var counter = new AsyncCounter();
var tasks = Enumerable.Range(0, 10)
.Select(_ => counter.IncrementAsync())
.ToArray();
await Task.WhenAll(tasks);
Console.WriteLine($"Final count: {await counter.IncrementAsync()}"); // 11
Release() має бути викликаний обов'язково. Інакше lock залишиться захопленим назавжди (deadlock для всіх наступних waiters).Щоб уникнути ручного try/finally, можна створити helper:
class AsyncLock
{
private readonly SemaphoreSlim _semaphore = new(1, 1);
public async Task<IDisposable> LockAsync()
{
await _semaphore.WaitAsync();
return new ReleaseWrapper(_semaphore);
}
private class ReleaseWrapper : IDisposable
{
private readonly SemaphoreSlim _semaphore;
public ReleaseWrapper(SemaphoreSlim semaphore) => _semaphore = semaphore;
public void Dispose() => _semaphore.Release();
}
}
// Використання — елегантний синтаксис
var asyncLock = new AsyncLock();
using (await asyncLock.LockAsync())
{
// Critical section
await Task.Delay(100);
} // Автоматичний Release при виході з using
| Аспект | lock (Monitor) | SemaphoreSlim.WaitAsync() |
|---|---|---|
| Async support | ❌ Не можна await всередині | ✅ Повна підтримка await |
| Thread affinity | ✅ Прив'язаний до потоку | ❌ Thread-agnostic |
| Performance | ⚡ Найшвидший (user-mode) | 🐢 Повільніший (~2x) |
| Cancellation | ❌ Немає | ✅ WaitAsync(CancellationToken) |
| Timeout | ✅ Monitor.TryEnter(timeout) | ✅ WaitAsync(timeout) |
| Use case | Синхронний код | Async код |
await, cancellation support.Throttling — це обмеження кількості одночасних операцій. Rate limiting — обмеження кількості операцій за одиницю часу.
Приклад: API дозволяє максимум 5 одночасних запитів та 100 запитів на хвилину.
SemaphoreSlim(N, N) дозволяє максимум N одночасних операцій:
class ThrottledHttpClient
{
private readonly HttpClient _http = new();
private readonly SemaphoreSlim _throttle = new(5, 5); // Максимум 5 одночасних запитів
public async Task<string> GetAsync(string url, CancellationToken ct = default)
{
// Чекаємо доки з'явиться "слот" для запиту
await _throttle.WaitAsync(ct);
try
{
Console.WriteLine($"Запит до {url} (активних: {5 - _throttle.CurrentCount})");
return await _http.GetStringAsync(url, ct);
}
finally
{
// Звільняємо слот
_throttle.Release();
}
}
}
// Використання — 20 паралельних запитів, але максимум 5 одночасно
var client = new ThrottledHttpClient();
var urls = Enumerable.Range(1, 20)
.Select(i => $"https://jsonplaceholder.typicode.com/posts/{i}")
.ToArray();
var tasks = urls.Select(url => client.GetAsync(url)).ToArray();
var results = await Task.WhenAll(tasks);
Console.WriteLine($"Завантажено {results.Length} сторінок");
Для обмеження кількості операцій за одиницю часу потрібен складніший алгоритм:
class RateLimiter
{
private readonly int _maxRequests;
private readonly TimeSpan _timeWindow;
private readonly Queue<DateTime> _requestTimestamps = new();
private readonly SemaphoreSlim _lock = new(1, 1);
public RateLimiter(int maxRequests, TimeSpan timeWindow)
{
_maxRequests = maxRequests;
_timeWindow = timeWindow;
}
public async Task WaitAsync(CancellationToken ct = default)
{
await _lock.WaitAsync(ct);
try
{
var now = DateTime.UtcNow;
var windowStart = now - _timeWindow;
// Видаляємо старі timestamps (поза вікном)
while (_requestTimestamps.Count > 0 && _requestTimestamps.Peek() < windowStart)
{
_requestTimestamps.Dequeue();
}
// Якщо досягнуто ліміту — чекаємо
if (_requestTimestamps.Count >= _maxRequests)
{
var oldestRequest = _requestTimestamps.Peek();
var delay = oldestRequest + _timeWindow - now;
if (delay > TimeSpan.Zero)
{
Console.WriteLine($"Rate limit досягнуто. Чекаємо {delay.TotalSeconds:F1}s...");
await Task.Delay(delay, ct);
}
// Після затримки — видаляємо найстаріший
_requestTimestamps.Dequeue();
}
// Реєструємо новий запит
_requestTimestamps.Enqueue(now);
}
finally
{
_lock.Release();
}
}
}
// Використання — максимум 10 запитів на 5 секунд
var rateLimiter = new RateLimiter(maxRequests: 10, timeWindow: TimeSpan.FromSeconds(5));
for (int i = 0; i < 25; i++)
{
await rateLimiter.WaitAsync();
Console.WriteLine($"Запит {i + 1} виконано о {DateTime.Now:HH:mm:ss.fff}");
}
Стандартний await не має вбудованого timeout. Якщо операція "зависла" — вона чекатиме нескінченно.
Найпростіший спосіб — змагання між операцією та таймером:
static async Task<T> WithTimeout<T>(Task<T> task, TimeSpan timeout)
{
var delayTask = Task.Delay(timeout);
var completedTask = await Task.WhenAny(task, delayTask);
if (completedTask == delayTask)
{
throw new TimeoutException($"Операція не завершилась за {timeout}");
}
return await task; // Повертаємо результат
}
// Використання
try
{
var result = await WithTimeout(
SlowOperationAsync(),
TimeSpan.FromSeconds(5)
);
Console.WriteLine($"Результат: {result}");
}
catch (TimeoutException ex)
{
Console.WriteLine($"Таймаут: {ex.Message}");
}
async Task<string> SlowOperationAsync()
{
await Task.Delay(10_000); // 10 секунд
return "Completed";
}
taskпродовжує виконуватись у фоні! Він не скасовується автоматично. Це може призвести до витоку ресурсів (відкриті з'єднання, файли).Правильний спосіб — передати CancellationToken з автоматичним timeout:
static async Task<T> WithTimeoutAndCancellation<T>(
Func<CancellationToken, Task<T>> operation,
TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
return await operation(cts.Token);
}
// Використання
try
{
var result = await WithTimeoutAndCancellation(
ct => SlowOperationAsync(ct),
TimeSpan.FromSeconds(5)
);
Console.WriteLine($"Результат: {result}");
}
catch (OperationCanceledException)
{
Console.WriteLine("Операція скасована через таймаут");
}
async Task<string> SlowOperationAsync(CancellationToken ct)
{
// Операція перевіряє cancellation
await Task.Delay(10_000, ct);
return "Completed";
}
Переваги:
CancellationToken)OperationCanceledExceptionКоли потрібно об'єднати timeout та зовнішній cancellation:
async Task<string> FetchWithTimeoutAsync(string url, CancellationToken externalCt)
{
// Створюємо токен з timeout
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// Об'єднуємо з зовнішнім токеном
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
externalCt,
timeoutCts.Token
);
// Операція скасується або при timeout, або при зовнішньому cancellation
var response = await _http.GetStringAsync(url, linkedCts.Token);
return response;
}
// Використання
using var userCts = new CancellationTokenSource();
// Користувач може скасувати через Ctrl+C
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
userCts.Cancel();
};
try
{
var data = await FetchWithTimeoutAsync("https://slow-api.com/data", userCts.Token);
Console.WriteLine(data);
}
catch (OperationCanceledException)
{
Console.WriteLine("Скасовано (timeout або користувач)");
}
Комбінація timeout + retry для нестабільних операцій:
static async Task<T> RetryWithTimeoutAsync<T>(
Func<CancellationToken, Task<T>> operation,
int maxRetries = 3,
TimeSpan? initialDelay = null,
CancellationToken ct = default)
{
initialDelay ??= TimeSpan.FromSeconds(1);
var delay = initialDelay.Value;
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
using var attemptCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
attemptCts.CancelAfter(TimeSpan.FromSeconds(5)); // Timeout на спробу
try
{
return await operation(attemptCts.Token);
}
catch (Exception ex) when (attempt < maxRetries &&
(ex is OperationCanceledException || ex is HttpRequestException))
{
Console.WriteLine($"Спроба {attempt} невдала: {ex.Message}. Повтор через {delay.TotalSeconds}s...");
await Task.Delay(delay, ct);
delay *= 2; // Exponential backoff
}
}
// Остання спроба без catch
using var finalCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
finalCts.CancelAfter(TimeSpan.FromSeconds(5));
return await operation(finalCts.Token);
}
// Використання
var result = await RetryWithTimeoutAsync(
async ct =>
{
var response = await _http.GetAsync("https://flaky-api.com/data", ct);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync(ct);
},
maxRetries: 5,
initialDelay: TimeSpan.FromSeconds(1)
);
Nito.AsyncEx — це community-driven бібліотека, що надає async-compatible версії стандартних синхронізаційних примітивів.
Встановлення:
dotnet add package Nito.AsyncEx
Готова реалізація async lock з правильною семантикою:
using Nito.AsyncEx;
class SharedResource
{
private readonly AsyncLock _lock = new();
private int _value;
public async Task<int> IncrementAsync()
{
// using + await — елегантний синтаксис
using (await _lock.LockAsync())
{
await Task.Delay(10);
return ++_value;
}
}
}
Переваги над ручним SemaphoreSlim:
Async версії event primitives:
using Nito.AsyncEx;
// Auto-reset: один waiter проходить, event скидається
var autoEvent = new AsyncAutoResetEvent(signaled: false);
// Producer
_ = Task.Run(async () =>
{
await Task.Delay(1000);
autoEvent.Set(); // Дозволяє одному waiter пройти
Console.WriteLine("Event set");
});
// Consumer
await autoEvent.WaitAsync();
Console.WriteLine("Event received");
// Manual-reset: всі waiters проходять
var manualEvent = new AsyncManualResetEvent(signaled: false);
// Кілька consumers
var waiters = Enumerable.Range(0, 5)
.Select(async i =>
{
await manualEvent.WaitAsync();
Console.WriteLine($"Waiter {i} пройшов");
})
.ToArray();
await Task.Delay(1000);
manualEvent.Set(); // Всі 5 waiters проходять одночасно
await Task.WhenAll(waiters);
Альтернатива Channel<T> з додатковими можливостями:
using Nito.AsyncEx;
var collection = new AsyncCollection<int>();
// Producer
_ = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await collection.AddAsync(i);
await Task.Delay(100);
}
collection.CompleteAdding();
});
// Consumer
await foreach (var item in collection.GetConsumingAsyncEnumerable())
{
Console.WriteLine($"Consumed: {item}");
}
Правильна реалізація lazy async initialization:
using Nito.AsyncEx;
class ExpensiveService
{
private readonly AsyncLazy<string> _data;
public ExpensiveService()
{
_data = new AsyncLazy<string>(async () =>
{
Console.WriteLine("Ініціалізація...");
await Task.Delay(2000);
return "Expensive data";
});
}
public Task<string> GetDataAsync() => _data.Task;
}
// Використання
var service = new ExpensiveService();
// Перший виклик — запускає ініціалізацію
var data1 = await service.GetDataAsync(); // 2 секунди
// Наступні виклики — повертають кешований результат
var data2 = await service.GetDataAsync(); // Instant
var data3 = await service.GetDataAsync(); // Instant
Async версія ReaderWriterLockSlim:
using Nito.AsyncEx;
class AsyncCache
{
private readonly AsyncReaderWriterLock _lock = new();
private readonly Dictionary<string, string> _cache = new();
public async Task<string?> GetAsync(string key)
{
// Множинні readers можуть читати одночасно
using (await _lock.ReaderLockAsync())
{
await Task.Delay(10); // Імітація читання
return _cache.TryGetValue(key, out var value) ? value : null;
}
}
public async Task SetAsync(string key, string value)
{
// Тільки один writer одночасно
using (await _lock.WriterLockAsync())
{
await Task.Delay(50); // Імітація запису
_cache[key] = value;
}
}
}
| Примітив | Manual (SemaphoreSlim) | Nito.AsyncEx |
|---|---|---|
| Async Lock | ~20 рядків коду | AsyncLock — 1 рядок |
| Cancellation | Ручна обробка | Вбудована підтримка |
| Reentrancy | Немає | Опціональна підтримка |
| Reader/Writer | ~100 рядків | AsyncReaderWriterLock |
| Lazy Init | Складна логіка | AsyncLazy<T> |
| Testing | Потрібні власні тести | Протестовано community |
Побудуємо HTTP client з усіма вивченими паттернами: throttling, rate limiting, retry, timeout, cancellation.
dotnet new console -n ResilientHttpClient
cd ResilientHttpClient
dotnet add package Nito.AsyncEx
using Nito.AsyncEx;
class RateLimiter
{
private readonly int _maxRequests;
private readonly TimeSpan _timeWindow;
private readonly Queue<DateTime> _timestamps = new();
private readonly AsyncLock _lock = new();
public RateLimiter(int maxRequests, TimeSpan timeWindow)
{
_maxRequests = maxRequests;
_timeWindow = timeWindow;
}
public async Task WaitAsync(CancellationToken ct = default)
{
using (await _lock.LockAsync(ct))
{
var now = DateTime.UtcNow;
var windowStart = now - _timeWindow;
while (_timestamps.Count > 0 && _timestamps.Peek() < windowStart)
_timestamps.Dequeue();
if (_timestamps.Count >= _maxRequests)
{
var delay = _timestamps.Peek() + _timeWindow - now;
if (delay > TimeSpan.Zero)
await Task.Delay(delay, ct);
_timestamps.Dequeue();
}
_timestamps.Enqueue(now);
}
}
}
class ResilientHttpClient : IDisposable
{
private readonly HttpClient _http = new();
private readonly SemaphoreSlim _throttle;
private readonly RateLimiter _rateLimiter;
private readonly int _maxRetries;
private readonly TimeSpan _timeout;
public ResilientHttpClient(
int maxConcurrent = 5,
int maxRequestsPerMinute = 60,
int maxRetries = 3,
TimeSpan? timeout = null)
{
_throttle = new SemaphoreSlim(maxConcurrent, maxConcurrent);
_rateLimiter = new RateLimiter(maxRequestsPerMinute, TimeSpan.FromMinutes(1));
_maxRetries = maxRetries;
_timeout = timeout ?? TimeSpan.FromSeconds(30);
}
public async Task<string> GetAsync(string url, CancellationToken ct = default)
{
// Rate limiting
await _rateLimiter.WaitAsync(ct);
// Throttling
await _throttle.WaitAsync(ct);
try
{
return await RetryAsync(async attemptCt =>
{
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, attemptCt);
linkedCts.CancelAfter(_timeout);
var response = await _http.GetAsync(url, linkedCts.Token);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync(linkedCts.Token);
}, ct);
}
finally
{
_throttle.Release();
}
}
private async Task<T> RetryAsync<T>(
Func<CancellationToken, Task<T>> operation,
CancellationToken ct)
{
var delay = TimeSpan.FromSeconds(1);
for (int attempt = 1; attempt <= _maxRetries; attempt++)
{
using var attemptCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
try
{
return await operation(attemptCts.Token);
}
catch (Exception ex) when (attempt < _maxRetries &&
(ex is HttpRequestException || ex is OperationCanceledException))
{
Console.WriteLine($" Спроба {attempt} невдала. Повтор через {delay.TotalSeconds}s...");
await Task.Delay(delay, ct);
delay *= 2;
}
}
return await operation(ct);
}
public void Dispose()
{
_http.Dispose();
_throttle.Dispose();
}
}
class RequestStats
{
private int _totalRequests;
private int _successfulRequests;
private int _failedRequests;
private readonly List<TimeSpan> _responseTimes = new();
private readonly AsyncLock _lock = new();
public async Task RecordSuccessAsync(TimeSpan responseTime)
{
using (await _lock.LockAsync())
{
_totalRequests++;
_successfulRequests++;
_responseTimes.Add(responseTime);
}
}
public async Task RecordFailureAsync()
{
using (await _lock.LockAsync())
{
_totalRequests++;
_failedRequests++;
}
}
public async Task PrintStatsAsync()
{
using (await _lock.LockAsync())
{
Console.WriteLine("\n" + new string('═', 60));
Console.WriteLine("📊 СТАТИСТИКА ЗАПИТІВ");
Console.WriteLine(new string('─', 60));
Console.WriteLine($"Всього запитів: {_totalRequests}");
Console.WriteLine($"Успішних: {_successfulRequests}");
Console.WriteLine($"Невдалих: {_failedRequests}");
if (_responseTimes.Count > 0)
{
var avg = TimeSpan.FromMilliseconds(_responseTimes.Average(t => t.TotalMilliseconds));
var min = _responseTimes.Min();
var max = _responseTimes.Max();
Console.WriteLine($"Середній час відповіді: {avg.TotalMilliseconds:F0}ms");
Console.WriteLine($"Мін/Макс: {min.TotalMilliseconds:F0}ms / {max.TotalMilliseconds:F0}ms");
}
Console.WriteLine(new string('═', 60));
}
}
}
using System.Diagnostics;
var urls = new[]
{
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
"https://jsonplaceholder.typicode.com/posts/4",
"https://jsonplaceholder.typicode.com/posts/5",
"https://jsonplaceholder.typicode.com/users/1",
"https://jsonplaceholder.typicode.com/users/2",
"https://jsonplaceholder.typicode.com/comments/1",
"https://jsonplaceholder.typicode.com/comments/2",
"https://jsonplaceholder.typicode.com/albums/1"
};
using var client = new ResilientHttpClient(
maxConcurrent: 3,
maxRequestsPerMinute: 20,
maxRetries: 3,
timeout: TimeSpan.FromSeconds(10)
);
var stats = new RequestStats();
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
Console.WriteLine("\n⚠️ Скасування...");
cts.Cancel();
};
Console.WriteLine("🚀 Початок завантаження...\n");
var tasks = urls.Select(async url =>
{
var sw = Stopwatch.StartNew();
try
{
var content = await client.GetAsync(url, cts.Token);
sw.Stop();
await stats.RecordSuccessAsync(sw.Elapsed);
Console.WriteLine($"✅ {url} ({sw.ElapsedMilliseconds}ms)");
}
catch (OperationCanceledException)
{
Console.WriteLine($"⚠️ {url} — скасовано");
}
catch (Exception ex)
{
await stats.RecordFailureAsync();
Console.WriteLine($"❌ {url} — {ex.Message}");
}
}).ToArray();
await Task.WhenAll(tasks);
await stats.PrintStatsAsync();
dotnet run
SemaphoreSlim.WaitAsync()
new SemaphoreSlim(1, 1) для mutual exclusiontry/finally для ReleaseThrottling
SemaphoreSlim(N, N) для N concurrentRate Limiting
Nito.AsyncEx
Реалізуйте async-safe file manager:
AsyncFileManager з методами ReadAsync(), WriteAsync(), AppendAsync()SemaphoreSlim для mutual exclusionТести:
Створіть web scraper з обмеженнями:
Вимоги:
Побудуйте resilient HTTP service wrapper:
Circuit Breaker States:
Вимоги:
stackalloc, Span<T>, та function pointers.System.Threading.Channels — Async Producer-Consumer
Повний розбір System.Threading.Channels — async-native producer/consumer patterns для high-performance pipelines. Bounded та Unbounded channels, BoundedChannelFullMode, multi-stage pipelines, BackgroundService інтеграція та benchmarks проти BlockingCollection.
Unsafe Code та Вказівники
Повний розбір unsafe коду в C# — вказівники, pointer arithmetic, fixed statement для pinning, stackalloc та Span<T>, sizeof, function pointers (C# 9+), та практичні сценарії використання для high-performance коду.