Producer-Consumer — один з найпоширеніших паттернів у багатопоточному програмуванні. Ідея проста: один або кілька потоків виробляють дані (producers), а інші потоки споживають ці дані (consumers). Між ними знаходиться буфер (queue), що дозволяє producers та consumers працювати незалежно, з різною швидкістю.
Класичні приклади:
До появи System.Threading.Channels стандартним рішенням була BlockingCollection<T>:
using System.Collections.Concurrent;
var queue = new BlockingCollection<int>(boundedCapacity: 10);
// Producer — блокує потік при повній черзі
Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
queue.Add(i); // ❌ БЛОКУЄ потік якщо черга повна!
Console.WriteLine($"Produced: {i}");
}
queue.CompleteAdding();
});
// Consumer — блокує потік при порожній черзі
Task.Run(() =>
{
foreach (var item in queue.GetConsumingEnumerable()) // ❌ БЛОКУЄ потік!
{
Thread.Sleep(100); // Імітація обробки
Console.WriteLine($"Consumed: {item}");
}
});
Проблеми BlockingCollection:
Add() та Take() блокують потік при повній/порожній черзі. У async світі це марнування ThreadPool потоків.await queue.AddAsync() — тільки синхронні методи.Channels — це async-native producer/consumer примітив, введений у .NET Core 2.1. Ключові переваги:
WriteAsync(), ReadAsync(), WaitToReadAsync() — жодного блокування потоків.Channel<T> складається з двох частин:
Channel<int> channel = Channel.CreateUnbounded<int>();
ChannelWriter<int> writer = channel.Writer; // Для producers
ChannelReader<int> reader = channel.Reader; // Для consumers
Розділення відповідальностей: Producer отримує тільки Writer, consumer — тільки Reader. Це запобігає помилкам (producer не може читати, consumer не може писати) та покращує читабельність коду.
using System.Threading.Channels;
// Unbounded — необмежена черга (обмежена тільки доступною пам'яттю)
var unbounded = Channel.CreateUnbounded<string>();
// Bounded — обмежена ємність (10 елементів)
var bounded = Channel.CreateBounded<string>(capacity: 10);
// Bounded з опціями (детально нижче)
var boundedWithOptions = Channel.CreateBounded<string>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait, // Що робити при переповненні
SingleReader = false, // Оптимізація для одного reader
SingleWriter = false // Оптимізація для одного writer
});
ValueTask для zero-allocation у випадку, коли запис відбувається синхронно.true якщо успішно, false якщо channel повний (bounded) або закритий. Не блокує потік, не чекає.false якщо channel закритий і більше ніколи не буде готовий до запису.Complete() жодні нові елементи не можуть бути записані. Опціонально приймає exception для сигналізації помилки.ChannelClosedException).true і записує елемент у out параметр якщо успішно. Повертає false якщо channel порожній.false якщо channel закритий і порожній (більше ніколи не буде даних).IAsyncEnumerable<T>, що дозволяє споживати всі елементи через await foreach. Автоматично завершується коли channel закритий і порожній.using System.Threading.Channels;
var channel = Channel.CreateUnbounded<int>();
// Producer — генерує числа
var producerTask = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
await Task.Delay(100); // Імітація роботи
}
// Сигналізуємо завершення
channel.Writer.Complete();
Console.WriteLine("Producer завершив роботу");
});
// Consumer — обробляє числа
var consumerTask = Task.Run(async () =>
{
// ReadAllAsync повертає IAsyncEnumerable — ідеально для await foreach
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumed: {item}");
await Task.Delay(150); // Імітація обробки
}
Console.WriteLine("Consumer завершив роботу");
});
await Task.WhenAll(producerTask, consumerTask);
await foreach замість циклу з ReadAsync()?ReadAllAsync() автоматично обробляє закриття channel — цикл завершується коли producer викликав Complete() і всі елементи оброблені. Ручний цикл з ReadAsync() вимагає обробки ChannelClosedException.Unbounded channels можуть зростати необмежено, що призводить до OOM якщо producer швидший за consumer. Bounded channels вирішують це через backpressure — автоматичне уповільнення producer при переповненні.
// Bounded channel на 5 елементів
var channel = Channel.CreateBounded<int>(capacity: 5);
// Producer намагається записати 10 елементів
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i); // Блокується після 5-го елемента
Console.WriteLine($"Wrote: {i}");
}
Коли bounded channel повний, поведінка залежить від FullMode:
WriteAsync() чекає доки з'явиться місце. Producer уповільнюється (backpressure). Найбезпечніший варіант — жодні дані не губляться.TryWrite() повертає false, WriteAsync() завершується успішно але елемент не додається. Корисно для real-time даних, де старі дані важливіші (наприклад, sensor readings).TryWrite() повертає false. Аналогічно DropNewest, але семантично чіткіше.using System.Threading.Channels;
// Channel на 3 елементи, відкидає найстаріші при переповненні
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(3)
{
FullMode = BoundedChannelFullMode.DropOldest
});
// Producer — швидко генерує дані
_ = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await channel.Writer.WriteAsync($"Frame_{i}");
Console.WriteLine($"Produced: Frame_{i}");
await Task.Delay(50); // Швидкий producer
}
channel.Writer.Complete();
});
// Consumer — повільно обробляє
await Task.Delay(500); // Затримка перед початком споживання
await foreach (var frame in channel.Reader.ReadAllAsync())
{
Console.WriteLine($" Consumed: {frame}");
}
Результат: Consumer отримав тільки останні 3 frames (8, 9, 10) — найактуальніші дані.
За замовчуванням Channel<T> підтримує множинних readers та writers — це вимагає синхронізації (locks або lock-free алгоритми). Якщо ви гарантуєте, що буде тільки один reader або один writer, можна увімкнути оптимізації:
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
{
SingleReader = true, // Тільки один consumer
SingleWriter = true, // Тільки один producer
FullMode = BoundedChannelFullMode.Wait
});
// Тепер channel використовує lock-free алгоритми без contention
Benchmark impact:
| Конфігурація | Throughput (ops/sec) | Allocation |
|---|---|---|
| Multiple R/W | 2.5M | 32 B/op |
| Single R/W | 8.1M | 0 B/op |
SingleReader = true, але два потоки викликають ReadAsync() одночасно — поведінка не визначена. Можливі race conditions, corrupted data, або навіть deadlock. Використовуйте ці опції тільки якщо архітектура гарантує single-threaded access.Реальні системи часто мають кілька стадій обробки: Extract → Transform → Load. Кожна стадія — окремий worker, що читає з одного channel і пише в інший.
using System.Threading.Channels;
// Stage 1: Extract — завантаження даних
async Task ExtractAsync(ChannelWriter<string> output, CancellationToken ct)
{
for (int i = 1; i <= 100; i++)
{
await output.WriteAsync($"raw_data_{i}", ct);
await Task.Delay(10, ct); // Імітація I/O
}
output.Complete();
Console.WriteLine("Extract завершено");
}
// Stage 2: Transform — обробка даних
async Task TransformAsync(
ChannelReader<string> input,
ChannelWriter<string> output,
CancellationToken ct)
{
await foreach (var raw in input.ReadAllAsync(ct))
{
// Обробка: uppercase + prefix
var transformed = $"PROCESSED_{raw.ToUpper()}";
await output.WriteAsync(transformed, ct);
}
output.Complete();
Console.WriteLine("Transform завершено");
}
// Stage 3: Load — збереження результатів
async Task LoadAsync(ChannelReader<string> input, CancellationToken ct)
{
var results = new List<string>();
await foreach (var item in input.ReadAllAsync(ct))
{
results.Add(item);
// Імітація запису в БД
await Task.Delay(5, ct);
}
Console.WriteLine($"Load завершено: {results.Count} записів");
}
// Побудова pipeline
var extractToTransform = Channel.CreateBounded<string>(10);
var transformToLoad = Channel.CreateBounded<string>(10);
using var cts = new CancellationTokenSource();
var extractTask = ExtractAsync(extractToTransform.Writer, cts.Token);
var transformTask = TransformAsync(extractToTransform.Reader, transformToLoad.Writer, cts.Token);
var loadTask = LoadAsync(transformToLoad.Reader, cts.Token);
await Task.WhenAll(extractTask, transformTask, loadTask);
Переваги multi-stage pipeline:
Якщо Transform — найповільніша стадія, можна запустити кілька workers:
// Запускаємо 4 паралельних Transform workers
var transformTasks = Enumerable.Range(0, 4)
.Select(id => TransformAsync(
extractToTransform.Reader,
transformToLoad.Writer,
id,
cts.Token))
.ToArray();
async Task TransformAsync(
ChannelReader<string> input,
ChannelWriter<string> output,
int workerId,
CancellationToken ct)
{
await foreach (var raw in input.ReadAllAsync(ct))
{
Console.WriteLine($"Worker {workerId} обробляє: {raw}");
var transformed = $"PROCESSED_{raw.ToUpper()}";
await output.WriteAsync(transformed, ct);
}
Console.WriteLine($"Worker {workerId} завершив роботу");
}
// Чекаємо завершення всіх workers
await Task.WhenAll(transformTasks);
transformToLoad.Writer.Complete(); // Закриваємо output тільки після всіх workers
ChannelReader<T> та ChannelWriter<T> повністю thread-safe. Кілька потоків можуть викликати ReadAsync() або WriteAsync() одночасно без додаткової синхронізації.У ASP.NET Core додатках часто потрібна фонова обробка: відправка email, генерація звітів, обробка зображень. Channels ідеально підходять для цього через інтеграцію з BackgroundService.
Архітектура:
record EmailJob(
string To,
string Subject,
string Body,
DateTime EnqueuedAt
);
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
class EmailBackgroundService : BackgroundService
{
private readonly ChannelReader<EmailJob> _reader;
private readonly ILogger<EmailBackgroundService> _logger;
public EmailBackgroundService(
ChannelReader<EmailJob> reader,
ILogger<EmailBackgroundService> logger)
{
_reader = reader;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Email service запущено");
await foreach (var job in _reader.ReadAllAsync(stoppingToken))
{
try
{
await SendEmailAsync(job, stoppingToken);
_logger.LogInformation("Email відправлено: {To}", job.To);
}
catch (Exception ex)
{
_logger.LogError(ex, "Помилка відправки email: {To}", job.To);
}
}
_logger.LogInformation("Email service зупинено");
}
private async Task SendEmailAsync(EmailJob job, CancellationToken ct)
{
// Імітація відправки email
await Task.Delay(500, ct);
Console.WriteLine($"📧 Email → {job.To}: {job.Subject}");
}
}
using System.Threading.Channels;
var builder = WebApplication.CreateBuilder(args);
// Створюємо bounded channel (обмежуємо чергу до 100 emails)
var emailChannel = Channel.CreateBounded<EmailJob>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait // Backpressure якщо черга повна
});
// Реєструємо Reader та Writer окремо
builder.Services.AddSingleton(emailChannel.Reader);
builder.Services.AddSingleton(emailChannel.Writer);
// Реєструємо BackgroundService
builder.Services.AddHostedService<EmailBackgroundService>();
var app = builder.Build();
using Microsoft.AspNetCore.Mvc;
using System.Threading.Channels;
[ApiController]
[Route("api/[controller]")]
class EmailController : ControllerBase
{
private readonly ChannelWriter<EmailJob> _emailQueue;
public EmailController(ChannelWriter<EmailJob> emailQueue)
{
_emailQueue = emailQueue;
}
[HttpPost("send")]
public async Task<IActionResult> SendEmail([FromBody] EmailRequest request)
{
var job = new EmailJob(
request.To,
request.Subject,
request.Body,
DateTime.UtcNow
);
// Додаємо у чергу (async, не блокує request thread)
var written = await _emailQueue.WriteAsync(job);
return Accepted(new { message = "Email додано у чергу" });
}
}
record EmailRequest(string To, string Subject, string Body);
Переваги цього підходу:
BackgroundService завершує обробку поточних emails.BackgroundService instances для паралельної обробки.Producer генерує 1,000,000 елементів, Consumer обробляє їх. Вимірюємо throughput та allocation.
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Collections.Concurrent;
using System.Threading.Channels;
[MemoryDiagnoser]
public class ProducerConsumerBenchmark
{
private const int ItemCount = 1_000_000;
[Benchmark(Baseline = true)]
public async Task BlockingCollection_Sync()
{
var queue = new BlockingCollection<int>(boundedCapacity: 1000);
var producer = Task.Run(() =>
{
for (int i = 0; i < ItemCount; i++)
queue.Add(i);
queue.CompleteAdding();
});
var consumer = Task.Run(() =>
{
foreach (var item in queue.GetConsumingEnumerable())
{
// Обробка
}
});
await Task.WhenAll(producer, consumer);
}
[Benchmark]
public async Task Channel_Async()
{
var channel = Channel.CreateBounded<int>(1000);
var producer = Task.Run(async () =>
{
for (int i = 0; i < ItemCount; i++)
await channel.Writer.WriteAsync(i);
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
// Обробка
}
});
await Task.WhenAll(producer, consumer);
}
[Benchmark]
public async Task ConcurrentQueue_Manual()
{
var queue = new ConcurrentQueue<int>();
var semaphore = new SemaphoreSlim(0);
bool completed = false;
var producer = Task.Run(async () =>
{
for (int i = 0; i < ItemCount; i++)
{
queue.Enqueue(i);
semaphore.Release();
}
completed = true;
semaphore.Release(); // Wake up consumer
});
var consumer = Task.Run(async () =>
{
while (!completed || !queue.IsEmpty)
{
await semaphore.WaitAsync();
if (queue.TryDequeue(out var item))
{
// Обробка
}
}
});
await Task.WhenAll(producer, consumer);
}
}
BenchmarkRunner.Run<ProducerConsumerBenchmark>();
Висновки:
Побудуємо повноцінний pipeline для обробки зображень з використанням всіх вивчених паттернів.
dotnet new console -n ImagePipeline
cd ImagePipeline
dotnet add package System.Threading.Channels
dotnet add package SixLabors.ImageSharp
record ImageJob(
string InputPath,
string OutputPath,
int TargetWidth,
int TargetHeight
);
record ProcessedImage(
string OutputPath,
long OriginalSize,
long ProcessedSize,
TimeSpan ProcessingTime
);
using System.Threading.Channels;
class FileScanner
{
public async Task ScanAsync(
string inputDir,
string outputDir,
ChannelWriter<ImageJob> output,
CancellationToken ct = default)
{
var imageFiles = Directory.GetFiles(inputDir, "*.jpg")
.Concat(Directory.GetFiles(inputDir, "*.png"))
.ToArray();
Console.WriteLine($"Знайдено {imageFiles.Length} зображень");
foreach (var inputPath in imageFiles)
{
var fileName = Path.GetFileName(inputPath);
var outputPath = Path.Combine(outputDir, $"resized_{fileName}");
var job = new ImageJob(inputPath, outputPath, 800, 600);
await output.WriteAsync(job, ct);
}
output.Complete();
Console.WriteLine("Сканування завершено");
}
}
using SixLabors.ImageSharp;
using SixLabors.ImageSharp.Processing;
using System.Diagnostics;
using System.Threading.Channels;
class ImageProcessor
{
private readonly int _workerId;
public ImageProcessor(int workerId)
{
_workerId = workerId;
}
public async Task ProcessAsync(
ChannelReader<ImageJob> input,
ChannelWriter<ProcessedImage> output,
CancellationToken ct = default)
{
await foreach (var job in input.ReadAllAsync(ct))
{
var sw = Stopwatch.StartNew();
try
{
var originalSize = new FileInfo(job.InputPath).Length;
// Завантаження та обробка зображення
using var image = await Image.LoadAsync(job.InputPath, ct);
image.Mutate(x => x.Resize(job.TargetWidth, job.TargetHeight));
await image.SaveAsJpegAsync(job.OutputPath, ct);
var processedSize = new FileInfo(job.OutputPath).Length;
sw.Stop();
var result = new ProcessedImage(
job.OutputPath,
originalSize,
processedSize,
sw.Elapsed
);
await output.WriteAsync(result, ct);
Console.WriteLine($"Worker {_workerId}: {Path.GetFileName(job.InputPath)} " +
$"({originalSize / 1024}KB → {processedSize / 1024}KB) за {sw.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Console.WriteLine($"Worker {_workerId}: Помилка обробки {job.InputPath}: {ex.Message}");
}
}
Console.WriteLine($"Worker {_workerId} завершив роботу");
}
}
using System.Threading.Channels;
class StatisticsCollector
{
public async Task CollectAsync(
ChannelReader<ProcessedImage> input,
CancellationToken ct = default)
{
int totalImages = 0;
long totalOriginalSize = 0;
long totalProcessedSize = 0;
var totalProcessingTime = TimeSpan.Zero;
await foreach (var result in input.ReadAllAsync(ct))
{
totalImages++;
totalOriginalSize += result.OriginalSize;
totalProcessedSize += result.ProcessedSize;
totalProcessingTime += result.ProcessingTime;
}
var compressionRatio = totalOriginalSize > 0
? (1 - (double)totalProcessedSize / totalOriginalSize) * 100
: 0;
Console.WriteLine("\n" + new string('═', 60));
Console.WriteLine("📊 СТАТИСТИКА ОБРОБКИ");
Console.WriteLine(new string('─', 60));
Console.WriteLine($"Оброблено зображень: {totalImages}");
Console.WriteLine($"Оригінальний розмір: {totalOriginalSize / 1024 / 1024:F2} MB");
Console.WriteLine($"Після обробки: {totalProcessedSize / 1024 / 1024:F2} MB");
Console.WriteLine($"Стиснення: {compressionRatio:F1}%");
Console.WriteLine($"Загальний час: {totalProcessingTime.TotalSeconds:F2}s");
Console.WriteLine($"Середній час/зображення: {totalProcessingTime.TotalMilliseconds / totalImages:F0}ms");
Console.WriteLine(new string('═', 60));
}
}
using System.Threading.Channels;
class ImagePipeline
{
private readonly string _inputDir;
private readonly string _outputDir;
private readonly int _workerCount;
public ImagePipeline(string inputDir, string outputDir, int workerCount = 4)
{
_inputDir = inputDir;
_outputDir = outputDir;
_workerCount = workerCount;
Directory.CreateDirectory(_outputDir);
}
public async Task RunAsync(CancellationToken ct = default)
{
// Створюємо channels з backpressure
var scanToProcess = Channel.CreateBounded<ImageJob>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = true // Тільки один scanner
});
var processToStats = Channel.CreateBounded<ProcessedImage>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true // Тільки один stats collector
});
// Stage 1: Scanner
var scanner = new FileScanner();
var scanTask = scanner.ScanAsync(_inputDir, _outputDir, scanToProcess.Writer, ct);
// Stage 2: Processors (паралельні workers)
var processorTasks = Enumerable.Range(0, _workerCount)
.Select(id =>
{
var processor = new ImageProcessor(id);
return processor.ProcessAsync(scanToProcess.Reader, processToStats.Writer, ct);
})
.ToArray();
// Закриваємо processToStats.Writer після завершення всіх processors
_ = Task.WhenAll(processorTasks).ContinueWith(_ => processToStats.Writer.Complete());
// Stage 3: Statistics
var statsCollector = new StatisticsCollector();
var statsTask = statsCollector.CollectAsync(processToStats.Reader, ct);
// Чекаємо завершення всього pipeline
await Task.WhenAll(scanTask, statsTask);
}
}
using System;
using System.Threading;
if (args.Length < 2)
{
Console.WriteLine("Використання: ImagePipeline <input_dir> <output_dir> [worker_count]");
return;
}
string inputDir = args[0];
string outputDir = args[1];
int workerCount = args.Length > 2 ? int.Parse(args[2]) : 4;
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
Console.WriteLine("\n⚠️ Скасування pipeline...");
cts.Cancel();
};
var pipeline = new ImagePipeline(inputDir, outputDir, workerCount);
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
await pipeline.RunAsync(cts.Token);
sw.Stop();
Console.WriteLine($"\n✅ Pipeline завершено за {sw.Elapsed.TotalSeconds:F2}s");
}
catch (OperationCanceledException)
{
Console.WriteLine("Pipeline скасовано користувачем");
}
catch (Exception ex)
{
Console.WriteLine($"❌ Помилка: {ex.Message}");
}
dotnet run -- ./input_images ./output_images 4
Channel<T> Basics
ChannelWriter<T> для producersChannelReader<T> для consumersReadAllAsync() для await foreachBounded Channels
FullMode: Wait, DropNewest, DropOldest, DropWriteSingleReader/SingleWriter для оптимізаціїMulti-Stage Pipelines
ASP.NET Core Integration
Реалізуйте простий chat server через Channels:
Channel<string> для отримання повідомленьChatRoom тримає список клієнтів/join <name>, /leave, /usersВимоги:
Створіть HTTP client з rate limiting через Channels:
Вимоги:
Побудуйте розподілену систему обробки завдань:
Вимоги:
Async — Просунуті Паттерни
Глибокий розбір просунутих асинхронних паттернів у C# — TaskCompletionSource для перетворення callback-based API, IAsyncEnumerable для streaming даних, ValueTask для оптимізації, та best practices для production-ready коду.
Асинхронна Синхронізація
Повний розбір асинхронної синхронізації в C# — чому lock не працює з await, SemaphoreSlim.WaitAsync() як async mutex, throttling та rate limiting patterns, timeout strategies, та бібліотека Nito.AsyncEx для просунутих сценаріїв.