Ви вже знаєте як синхронізувати доступ до простих змінних через lock, Interlocked та інші примітиви. Але що робити з колекціями — List<T>, Dictionary<K,V>, Queue<T>? Чи можна просто обгорнути їх у lock і забути про проблеми?
Відповідь: можна, але це неефективно. .NET надає спеціалізовані concurrent collections що оптимізовані для багатопотокового доступу і працюють набагато швидше ніж "колекція + lock".
У цій темі ми розглянемо:
Почнемо з простого експерименту:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
var list = new List<int>();
// 10 потоків одночасно додають по 1000 елементів
Parallel.For(0, 10, i =>
{
for (int j = 0; j < 1000; j++)
{
list.Add(i * 1000 + j); // ← НЕ thread-safe!
}
});
Console.WriteLine($"Expected: 10000, Actual: {list.Count}");
// Типовий результат: 8500-9800 (втрачено елементи!)
// Ще гірше: можлива IndexOutOfRangeException або навіть crash!
Що пішло не так?
List<T>.Add() виконує наступні кроки:
// Спрощений псевдокод List<T>.Add():
public void Add(T item)
{
if (_size == _items.Length) // 1. Перевірка чи потрібно розширити масив
{
EnsureCapacity(_size + 1); // 2. Розширення (створення нового масиву)
}
_items[_size] = item; // 3. Запис елемента
_size++; // 4. Інкремент розміру
}
Race condition сценарій:
Thread 1 Thread 2
────────────────────────────────────────────────────────
Read _size = 100
Read _size = 100
Write _items[100] = "A"
Write _items[100] = "B" ← Перезаписує "A"!
_size = 101
_size = 101 ← Той самий індекс!
Результат: Елемент "A" втрачено, _size некоректний, можлива corruption внутрішнього масиву.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
var dict = new Dictionary<string, int>();
try
{
// 10 потоків одночасно додають елементи
Parallel.For(0, 10, i =>
{
for (int j = 0; j < 1000; j++)
{
string key = $"key-{i}-{j}";
dict[key] = i * 1000 + j; // ← НЕ thread-safe!
}
});
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.GetType().Name}");
Console.WriteLine($"Message: {ex.Message}");
}
// Типовий результат:
// Exception: InvalidOperationException
// Message: Collection was modified; enumeration operation may not execute.
// АБО: IndexOutOfRangeException, NullReferenceException
Чому Dictionary ще небезпечніший?
Dictionary<K,V> використовує hash table з buckets (корзинами). При додаванні елемента:
Rehashing у багатопотоковому коді = катастрофа:
var dict = new Dictionary<string, int>();
var lockObj = new object();
// ✅ Thread-safe, але ПОВІЛЬНО
Parallel.For(0, 10, i =>
{
for (int j = 0; j < 1000; j++)
{
string key = $"key-{i}-{j}";
lock (lockObj) // ← Всі потоки конкурують за ОДИН lock
{
dict[key] = i * 1000 + j;
}
}
});
// Проблеми:
// 1. Lock contention — всі 10 потоків чекають на один lock
// 2. Serialization — фактично виконується послідовно
// 3. Погана scalability — чим більше потоків, тим гірше
Benchmark: Dictionary + lock vs ConcurrentDictionary:
10 потоків × 10,000 операцій кожен:
Dictionary + lock: ~800ms
ConcurrentDictionary: ~150ms (в 5x швидше!)
ConcurrentDictionary використовує striped locking (смугасте блокування) — замість одного lock на всю колекцію, є кілька locks для різних частин:
┌─────────────────────────────────────────────────────┐
│ ConcurrentDictionary<K, V> │
├─────────────────────────────────────────────────────┤
│ Bucket 0-15 │ Lock 0 │ [entries...] │
│ Bucket 16-31 │ Lock 1 │ [entries...] │
│ Bucket 32-47 │ Lock 2 │ [entries...] │
│ Bucket 48-63 │ Lock 3 │ [entries...] │
│ ... │ ... │ ... │
└─────────────────────────────────────────────────────┘
Thread 1 працює з Bucket 5 (Lock 0) ─┐
Thread 2 працює з Bucket 20 (Lock 1) ─┼─ Паралельно!
Thread 3 працює з Bucket 35 (Lock 2) ─┘
Переваги:
Trade-off:
Count) потребують захоплення всіх locksДавайте розберемо як саме ConcurrentDictionary досягає високої продуктивності через striped locking.
Крок 1: Визначення bucket
Коли ви додаєте елемент з ключем, спочатку обчислюється hash code ключа:
int hashCode = key.GetHashCode();
Потім hash code використовується для визначення bucket (корзини) куди потрапить елемент:
int bucketIndex = hashCode % totalBuckets;
Крок 2: Визначення lock
Кожен bucket належить до певного lock segment. Наприклад, якщо є 64 buckets та 4 locks:
Buckets 0-15 → Lock 0
Buckets 16-31 → Lock 1
Buckets 32-47 → Lock 2
Buckets 48-63 → Lock 3
Крок 3: Захоплення lock та модифікація
Тільки потоки що працюють з тим самим lock segment конкурують між собою. Потоки що працюють з різними segments — працюють паралельно.
Приклад сценарію:
Thread 1: Add("apple", 10)
→ hashCode = 12345
→ bucket = 12345 % 64 = 5
→ Lock 0 (buckets 0-15)
→ Захоплює Lock 0, додає елемент
Thread 2: Add("banana", 20)
→ hashCode = 67890
→ bucket = 67890 % 64 = 26
→ Lock 1 (buckets 16-31)
→ Захоплює Lock 1, додає елемент ПАРАЛЕЛЬНО з Thread 1!
Thread 3: Add("avocado", 30)
→ hashCode = 11111
→ bucket = 11111 % 64 = 7
→ Lock 0 (buckets 0-15)
→ ЧЕКАЄ поки Thread 1 звільнить Lock 0
Чому це швидше ніж один lock?
З одним lock на всю колекцію, Thread 2 мав би чекати Thread 1, навіть якщо вони працюють з різними частинами колекції. Striped locking дозволяє їм працювати паралельно, що значно підвищує throughput на багатоядерних системах.
Візуалізація через Mermaid:
Ключовий висновок: Striped locking перетворює "один вузьке місце" (single lock) на "кілька менших вузьких місць" (multiple locks), що дозволяє більшій кількості потоків працювати одночасно.
var dict = new ConcurrentDictionary<string, int>();
// Спроба додати елемент
bool added = dict.TryAdd("key1", 100);
Console.WriteLine($"Added: {added}"); // true
// Повторна спроба — не вдасться (ключ вже є)
added = dict.TryAdd("key1", 200);
Console.WriteLine($"Added: {added}"); // false
Console.WriteLine($"Value: {dict["key1"]}"); // 100 (не змінилось)
Коли використовувати: Коли потрібно додати елемент тільки якщо його ще немає, і ви хочете знати чи операція вдалась.
var dict = new ConcurrentDictionary<string, int>();
dict.TryAdd("key1", 100);
// ✅ Безпечний спосіб читання
if (dict.TryGetValue("key1", out int value))
{
Console.WriteLine($"Found: {value}");
}
else
{
Console.WriteLine("Not found");
}
// ❌ Небезпечний спосіб (може кинути KeyNotFoundException)
// int value = dict["key1"];
Чому TryGetValue краще за indexer:
ContainsKey + indexer)var dict = new ConcurrentDictionary<string, int>();
// Варіант 1: з готовим значенням
int value1 = dict.GetOrAdd("key1", 100);
Console.WriteLine(value1); // 100 (додано)
int value2 = dict.GetOrAdd("key1", 200);
Console.WriteLine(value2); // 100 (вже було, не змінилось)
// Варіант 2: з factory function (викликається тільки якщо ключа немає)
int value3 = dict.GetOrAdd("key2", key =>
{
Console.WriteLine($"Computing value for {key}...");
return ExpensiveComputation(); // Дорога операція
});
// Варіант 3: з factoryArgument (без closure allocation)
int value4 = dict.GetOrAdd("key3",
static (key, arg) => arg * 2, // static lambda = no closure
factoryArgument: 50);
Console.WriteLine(value4); // 100
Важливо: Factory function може бути викликана кілька разів якщо кілька потоків одночасно намагаються додати той самий ключ. Тільки один результат буде збережено, решта відкинуті.
// ⚠️ Factory може викликатись кілька разів!
var dict = new ConcurrentDictionary<string, int>();
Parallel.For(0, 10, i =>
{
dict.GetOrAdd("same-key", key =>
{
Console.WriteLine($"Thread {i} computing...");
return i;
});
});
// Вивід:
// Thread 0 computing...
// Thread 1 computing... ← Обидва обчислюють!
// Thread 2 computing...
// ...
// Але тільки ОДНЕ значення буде збережено
var dict = new ConcurrentDictionary<string, int>();
// Варіант 1: з готовими значеннями
int result1 = dict.AddOrUpdate(
key: "counter",
addValue: 1, // Якщо ключа немає → додати 1
updateValueFactory: (key, oldValue) => oldValue + 1 // Якщо є → інкремент
);
Console.WriteLine(result1); // 1 (додано)
int result2 = dict.AddOrUpdate("counter", 1, (key, oldValue) => oldValue + 1);
Console.WriteLine(result2); // 2 (оновлено)
// Варіант 2: з factoryArgument (без closure)
int result3 = dict.AddOrUpdate(
key: "counter",
addValueFactory: static (key, arg) => arg,
updateValueFactory: static (key, oldValue, arg) => oldValue + arg,
factoryArgument: 10
);
Console.WriteLine(result3); // 12 (2 + 10)
Use case: Thread-safe counter:
var counters = new ConcurrentDictionary<string, int>();
// 1000 потоків інкрементують різні лічильники
Parallel.For(0, 1000, i =>
{
string key = $"counter-{i % 10}"; // 10 різних лічильників
counters.AddOrUpdate(key, 1, (k, v) => v + 1);
});
foreach (var kvp in counters)
{
Console.WriteLine($"{kvp.Key}: {kvp.Value}");
}
// Кожен лічильник має значення 100 (1000 / 10)
var dict = new ConcurrentDictionary<string, int>();
dict.TryAdd("balance", 1000);
// Спроба оновити ТІЛЬКИ якщо поточне значення = 1000
bool updated = dict.TryUpdate(
key: "balance",
newValue: 900,
comparisonValue: 1000 // Очікуване поточне значення
);
Console.WriteLine($"Updated: {updated}"); // true
Console.WriteLine($"Balance: {dict["balance"]}"); // 900
// Повторна спроба — не вдасться (значення вже 900, не 1000)
updated = dict.TryUpdate("balance", 800, 1000);
Console.WriteLine($"Updated: {updated}"); // false
Console.WriteLine($"Balance: {dict["balance"]}"); // 900 (не змінилось)
Use case: Optimistic concurrency:
// Банківська транзакція з retry
bool Withdraw(ConcurrentDictionary<string, decimal> accounts, string accountId, decimal amount)
{
while (true)
{
if (!accounts.TryGetValue(accountId, out decimal currentBalance))
return false; // Рахунок не існує
if (currentBalance < amount)
return false; // Недостатньо коштів
decimal newBalance = currentBalance - amount;
// Спроба оновити (CAS pattern)
if (accounts.TryUpdate(accountId, newBalance, currentBalance))
return true; // Успіх!
// Хтось інший змінив balance між TryGetValue та TryUpdate → retry
}
}
var dict = new ConcurrentDictionary<string, int>();
dict.TryAdd("key1", 100);
// Видалити та отримати значення
if (dict.TryRemove("key1", out int removedValue))
{
Console.WriteLine($"Removed: {removedValue}"); // 100
}
// Повторна спроба — не вдасться
if (!dict.TryRemove("key1", out _))
{
Console.WriteLine("Key not found");
}
using System;
using System.Collections.Concurrent;
using System.Threading;
public class ThreadSafeCache<TKey, TValue> where TKey : notnull
{
private readonly ConcurrentDictionary<TKey, CacheEntry> _cache = new();
private readonly TimeSpan _defaultTtl;
private record CacheEntry(TValue Value, DateTime ExpiresAt);
public ThreadSafeCache(TimeSpan defaultTtl)
{
_defaultTtl = defaultTtl;
}
public bool TryGet(TKey key, out TValue? value)
{
if (_cache.TryGetValue(key, out var entry))
{
if (entry.ExpiresAt > DateTime.UtcNow)
{
value = entry.Value;
return true;
}
// Expired → видаляємо
_cache.TryRemove(key, out _);
}
value = default;
return false;
}
public void Set(TKey key, TValue value, TimeSpan? ttl = null)
{
var expiresAt = DateTime.UtcNow + (ttl ?? _defaultTtl);
var entry = new CacheEntry(value, expiresAt);
_cache[key] = entry; // AddOrUpdate через indexer
}
public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory, TimeSpan? ttl = null)
{
var expiresAt = DateTime.UtcNow + (ttl ?? _defaultTtl);
var entry = _cache.GetOrAdd(key, k =>
{
var value = valueFactory(k);
return new CacheEntry(value, expiresAt);
});
// Перевірка expiration
if (entry.ExpiresAt <= DateTime.UtcNow)
{
// Expired → видаляємо та рекурсивно викликаємо знову
_cache.TryRemove(key, out _);
return GetOrAdd(key, valueFactory, ttl);
}
return entry.Value;
}
public void Clear() => _cache.Clear();
public int Count => _cache.Count;
}
// Використання:
var cache = new ThreadSafeCache<string, string>(TimeSpan.FromSeconds(10));
// 100 потоків одночасно працюють з кешем
Parallel.For(0, 100, i =>
{
string key = $"key-{i % 10}"; // 10 різних ключів
string value = cache.GetOrAdd(key, k =>
{
Console.WriteLine($"[Thread {i}] Computing {k}...");
Thread.Sleep(100); // Симуляція дорогої операції
return $"value-{k}";
});
Console.WriteLine($"[Thread {i}] Got: {value}");
});
// Тільки 10 обчислень (по одному на ключ), решта взяли з кешу
Одне з найпоширеніших застосувань ConcurrentDictionary — підрахунок частоти слів у великих текстових файлах. Це класична задача MapReduce, яку можна ефективно розв'язати паралельно.
Сценарій: У нас є кілька великих текстових файлів, і ми хочемо підрахувати скільки разів кожне слово зустрічається у всіх файлах разом.
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Diagnostics;
public class WordCounter
{
private readonly ConcurrentDictionary<string, int> _wordCounts = new();
public void ProcessFile(string filePath)
{
// Читаємо файл та розбиваємо на слова
string content = File.ReadAllText(filePath);
var words = content
.Split(new[] { ' ', '\n', '\r', '\t', '.', ',', '!', '?' },
StringSplitOptions.RemoveEmptyEntries)
.Select(w => w.ToLowerInvariant());
// Підраховуємо кожне слово
foreach (string word in words)
{
// AddOrUpdate: якщо слова немає → додати 1, якщо є → інкремент
_wordCounts.AddOrUpdate(
key: word,
addValue: 1, // Якщо слова немає
updateValueFactory: (key, oldValue) => oldValue + 1 // Якщо є
);
}
}
public void PrintTopWords(int count = 10)
{
var topWords = _wordCounts
.OrderByDescending(kvp => kvp.Value)
.Take(count);
Console.WriteLine($"\nTop {count} words:");
foreach (var kvp in topWords)
{
Console.WriteLine($" {kvp.Key}: {kvp.Value}");
}
}
public int TotalWords => _wordCounts.Values.Sum();
public int UniqueWords => _wordCounts.Count;
}
// Використання:
var counter = new WordCounter();
var files = Directory.GetFiles(@"C:\Logs", "*.txt");
Console.WriteLine($"Processing {files.Length} files...");
var sw = Stopwatch.StartNew();
// ❌ ПОВІЛЬНО: Послідовна обробка
// foreach (var file in files)
// {
// counter.ProcessFile(file);
// }
// ✅ ШВИДКО: Паралельна обробка
Parallel.ForEach(files, file =>
{
counter.ProcessFile(file);
Console.WriteLine($"Processed: {Path.GetFileName(file)}");
});
sw.Stop();
Console.WriteLine($"\nCompleted in {sw.ElapsedMilliseconds}ms");
Console.WriteLine($"Total words: {counter.TotalWords:N0}");
Console.WriteLine($"Unique words: {counter.UniqueWords:N0}");
counter.PrintTopWords(10);
Вивід (приклад):
Processing 50 files...
Processed: log1.txt
Processed: log3.txt
Processed: log2.txt
...
Completed in 1,234ms
Total words: 1,245,678
Unique words: 45,123
Top 10 words:
the: 89,234
and: 67,890
error: 45,123
to: 34,567
in: 29,876
a: 28,345
is: 23,456
of: 21,234
for: 19,876
with: 18,765
Чому це працює ефективно?
Parallel.ForEach розподіляє файли між потокамиАльтернативний підхід з GetOrAdd (менш ефективний):
// ❌ Менш ефективно: два звернення до словника
foreach (string word in words)
{
int currentCount = _wordCounts.GetOrAdd(word, 0);
_wordCounts[word] = currentCount + 1; // Race condition!
}
Проблема: Між GetOrAdd та присвоєнням інший потік може змінити значення, і ми втратимо оновлення. AddOrUpdate робить це атомарно.
Benchmark: ConcurrentDictionary vs Dictionary + lock:
// Benchmark: 50 файлів, ~1MB кожен, 10 потоків
Dictionary + lock: ~3,500ms
ConcurrentDictionary: ~1,200ms (в 3x швидше!)
ConcurrentQueue<T> реалізована як lock-free linked list з окремими head та tail вказівниками:
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│ Node │───▶│ Node │───▶│ Node │───▶│ Node │───▶ null
│ A │ │ B │ │ C │ │ D │
└──────┘ └──────┘ └──────┘ └──────┘
↑ ↑
head tail
(Dequeue тут) (Enqueue тут)
Ключові особливості:
Enqueue додає у tail (кінець)TryDequeue забирає з head (початок)Interlocked.CompareExchangeusing System.Collections.Concurrent;
var queue = new ConcurrentQueue<int>();
// Enqueue — додати елемент (завжди успішно)
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
// TryDequeue — забрати елемент
if (queue.TryDequeue(out int result))
{
Console.WriteLine($"Dequeued: {result}"); // 1 (FIFO)
}
// TryPeek — подивитись без видалення
if (queue.TryPeek(out int peeked))
{
Console.WriteLine($"Peeked: {peeked}"); // 2
}
// Count — приблизна кількість (snapshot)
Console.WriteLine($"Count: {queue.Count}"); // ~2
// IsEmpty — перевірка чи порожня
Console.WriteLine($"IsEmpty: {queue.IsEmpty}"); // false
Важливо: Count та IsEmpty — це snapshot значення. Між викликом IsEmpty та TryDequeue інший потік може змінити чергу.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
var queue = new ConcurrentQueue<string>();
var cts = new CancellationTokenSource();
// Producer: генерує повідомлення
var producer = Task.Run(() =>
{
int messageId = 0;
while (!cts.Token.IsCancellationRequested)
{
string message = $"Message-{messageId++}";
queue.Enqueue(message);
Console.WriteLine($"[Producer] Enqueued: {message}");
Thread.Sleep(Random.Shared.Next(50, 150));
}
});
// Consumer: обробляє повідомлення
var consumer = Task.Run(() =>
{
while (!cts.Token.IsCancellationRequested)
{
if (queue.TryDequeue(out string? message))
{
Console.WriteLine($"[Consumer] Processing: {message}");
Thread.Sleep(Random.Shared.Next(100, 200)); // Симуляція обробки
}
else
{
Thread.Sleep(10); // Черга порожня, трохи почекати
}
}
});
// Працюємо 5 секунд
await Task.Delay(5000);
cts.Cancel();
await Task.WhenAll(producer, consumer);
Console.WriteLine($"Final queue count: {queue.Count}");
ConcurrentStack<T> — це lock-free stack через linked list з одним head вказівником:
head
↓
┌──────┐
│ Node │
│ D │ ← Push/Pop тут (вершина)
└──┬───┘
↓
┌──────┐
│ Node │
│ C │
└──┬───┘
↓
┌──────┐
│ Node │
│ B │
└──┬───┘
↓
┌──────┐
│ Node │
│ A │
└──┬───┘
↓
null
Операції:
Push — додає на вершину (атомарно через CAS)TryPop — забирає з вершини (атомарно через CAS)using System.Collections.Concurrent;
var stack = new ConcurrentStack<int>();
// Push — додати елемент
stack.Push(1);
stack.Push(2);
stack.Push(3);
// TryPop — забрати елемент
if (stack.TryPop(out int result))
{
Console.WriteLine($"Popped: {result}"); // 3 (LIFO)
}
// TryPeek — подивитись без видалення
if (stack.TryPeek(out int peeked))
{
Console.WriteLine($"Peeked: {peeked}"); // 2
}
// PushRange — додати кілька елементів одразу (ефективніше)
int[] items = { 10, 20, 30 };
stack.PushRange(items);
// TryPopRange — забрати кілька елементів одразу
int[] buffer = new int[2];
int popped = stack.TryPopRange(buffer);
Console.WriteLine($"Popped {popped} items: {string.Join(", ", buffer)}");
// Popped 2 items: 30, 20
ConcurrentBag<T> — унікальна колекція оптимізована для сценарію де той самий потік що додає — також забирає:
┌─────────────────────────────────────────────────┐
│ ConcurrentBag<T> │
├─────────────────────────────────────────────────┤
│ Thread 1 Local List: [A, B, C] │
│ Thread 2 Local List: [D, E] │
│ Thread 3 Local List: [F, G, H, I] │
└─────────────────────────────────────────────────┘
Thread 1: Add(X) → йде у Thread 1 Local List (lock-free!)
Thread 1: TryTake() → бере з Thread 1 Local List (lock-free!)
Thread 2: TryTake() → спочатку з Thread 2 Local List,
якщо порожня → "краде" з Thread 1 або 3
Переваги:
Недоліки:
ConcurrentBag<T> використовує дуже розумну оптимізацію: кожен потік має свій локальний список елементів. Це дозволяє уникнути синхронізації у найпоширенішому сценарії — коли потік додає елементи і сам же їх забирає.
Внутрішня структура:
// Спрощена концептуальна модель
class ConcurrentBag<T>
{
// Кожен потік має свій ThreadLocalList
private ThreadLocal<WorkStealingQueue<T>> _locals;
// Глобальний список всіх thread-local черг
private List<WorkStealingQueue<T>> _allQueues;
}
Сценарій 1: Додавання у власному потоці (найшвидший)
// Thread 1 додає елемент
bag.Add("A");
// Що відбувається:
// 1. Перевірка: чи є у Thread 1 локальний список? Якщо ні → створити
// 2. Додати "A" у локальний список Thread 1 (lock-free!)
// 3. Готово — жодної синхронізації!
Сценарій 2: Забирання з власного потоку (швидкий)
// Thread 1 забирає елемент
if (bag.TryTake(out string? item))
{
Console.WriteLine(item); // "A"
}
// Що відбувається:
// 1. Спроба забрати з локального списку Thread 1 (lock-free!)
// 2. Якщо список не порожній → повернути елемент
// 3. Готово — жодної синхронізації!
Сценарій 3: "Крадіжка" з іншого потоку (повільний)
// Thread 2 намагається забрати елемент, але його локальний список порожній
if (bag.TryTake(out string? item))
{
Console.WriteLine(item); // "A" (вкрадено з Thread 1!)
}
// Що відбувається:
// 1. Локальний список Thread 2 порожній
// 2. Перебір всіх інших thread-local списків
// 3. Знайдено непорожній список Thread 1
// 4. Захоплення lock на список Thread 1
// 5. "Крадіжка" елемента з Thread 1
// 6. Звільнення lock
Візуалізація через Mermaid:
Чому це ефективно для Parallel.ForEach?
У типовому сценарії Parallel.ForEach, кожен worker потік:
ConcurrentBagПісля завершення всіх потоків, головний потік забирає всі результати через ToArray() або ToList(). У цьому сценарії:
Add() операції — lock-free (кожен потік додає у свій список)Benchmark: ConcurrentBag vs ConcurrentQueue для Parallel.ForEach:
// Тест: 10,000 ітерацій, 8 потоків, кожен додає 1000 елементів
ConcurrentQueue: ~45ms
ConcurrentBag: ~28ms (в 1.6x швидше!)
✅ Використовуйте ConcurrentBag коли:
Parallel.ForEach + збір результатів)❌ НЕ використовуйте ConcurrentBag коли:
ConcurrentQueue або ConcurrentStackConcurrentQueue або BlockingCollectionConcurrentQueueusing System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
var imageFiles = Directory.GetFiles(@"C:\Images", "*.jpg");
var processedImages = new ConcurrentBag<string>();
Console.WriteLine($"Processing {imageFiles.Length} images...");
Parallel.ForEach(imageFiles, imageFile =>
{
// Симуляція обробки зображення
string fileName = Path.GetFileName(imageFile);
Console.WriteLine($"[Thread {Thread.CurrentThread.ManagedThreadId}] Processing: {fileName}");
Thread.Sleep(Random.Shared.Next(50, 150)); // Симуляція роботи
// Додаємо результат у ConcurrentBag (lock-free для цього потоку!)
processedImages.Add($"processed_{fileName}");
});
Console.WriteLine($"\nProcessed {processedImages.Count} images");
// Забираємо всі результати (тут може бути "крадіжка", але це одноразова операція)
var results = processedImages.ToArray();
foreach (var result in results.Take(5))
{
Console.WriteLine($" - {result}");
}
using System.Collections.Concurrent;
using System.Threading.Tasks;
// ✅ ДОБРЕ: Parallel.ForEach з локальною обробкою
var bag = new ConcurrentBag<string>();
Parallel.ForEach(Directory.GetFiles(@"C:\Logs"), file =>
{
// Кожен потік обробляє свої файли і додає результати
var lines = File.ReadAllLines(file);
foreach (var line in lines)
{
if (line.Contains("ERROR"))
{
bag.Add(line); // Додає у локальний список цього потоку
}
}
});
// Після завершення — забираємо всі результати
var errors = bag.ToArray();
Console.WriteLine($"Found {errors.Length} errors");
BlockingCollection<T> — обгортка над будь-якою IProducerConsumerCollection<T> (зазвичай ConcurrentQueue<T>) що додає:
Add() блокується якщо черга повна, Take() блокується якщо порожняProducer BlockingCollection Consumer
────────────────────────────────────────────────────────────────
Add(item1) ────────▶ [item1]
Add(item2) ────────▶ [item1, item2]
◀──────── Take() = item1
Add(item3) ────────▶ [item2, item3]
Add(item4) ────────▶ [item2, item3, item4]
Add(item5) ────────▶ [item2, item3, item4, item5] ← Черга повна (capacity = 4)
Add(item6) ────────▶ БЛОКУЄТЬСЯ! Чекає поки Consumer забере елемент
◀──────── Take() = item2
Add(item6) ────────▶ [item3, item4, item5, item6] ← Тепер успішно
CompleteAdding() ──▶ [item3, item4, item5, item6] ← Сигнал "більше не буде"
◀──────── Take() = item3
◀──────── Take() = item4
◀──────── Take() = item5
◀──────── Take() = item6
◀──────── Take() → InvalidOperationException
(черга порожня + CompleteAdding)
using System.Collections.Concurrent;
// Створення з обмеженням capacity = 10
var collection = new BlockingCollection<int>(boundedCapacity: 10);
// Add — додати елемент (блокується якщо повна)
collection.Add(1);
collection.Add(2);
// TryAdd — спроба додати з timeout
bool added = collection.TryAdd(3, TimeSpan.FromSeconds(1));
Console.WriteLine($"Added: {added}");
// Take — забрати елемент (блокується якщо порожня)
int item = collection.Take();
Console.WriteLine($"Taken: {item}"); // 1
// TryTake — спроба забрати з timeout
if (collection.TryTake(out int result, TimeSpan.FromSeconds(1)))
{
Console.WriteLine($"Taken: {result}"); // 2
}
// CompleteAdding — сигнал що більше не буде елементів
collection.CompleteAdding();
// IsCompleted — перевірка чи завершено додавання та черга порожня
Console.WriteLine($"IsCompleted: {collection.IsCompleted}");
// IsAddingCompleted — перевірка чи викликано CompleteAdding
Console.WriteLine($"IsAddingCompleted: {collection.IsAddingCompleted}");
GetConsumingEnumerable() — найзручніший спосіб обробляти елементи у consumer потоці. Він автоматично:
CompleteAdding() та черга порожняusing System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
var collection = new BlockingCollection<string>(boundedCapacity: 5);
// Producer: додає повідомлення
var producer = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
string message = $"Message-{i}";
collection.Add(message);
Console.WriteLine($"[Producer] Added: {message}");
Thread.Sleep(100);
}
collection.CompleteAdding(); // Сигнал що більше не буде
Console.WriteLine("[Producer] Completed adding");
});
// Consumer: обробляє повідомлення
var consumer = Task.Run(() =>
{
// GetConsumingEnumerable блокується поки чекає елементи
// та автоматично завершується після CompleteAdding
foreach (string message in collection.GetConsumingEnumerable())
{
Console.WriteLine($"[Consumer] Processing: {message}");
Thread.Sleep(150); // Симуляція обробки
}
Console.WriteLine("[Consumer] No more items");
});
await Task.WhenAll(producer, consumer);
Console.WriteLine("Done!");
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
var collection = new BlockingCollection<int>(boundedCapacity: 20);
// 3 Producers: генерують числа
var producers = Enumerable.Range(0, 3).Select(producerId =>
Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
int value = producerId * 100 + i;
collection.Add(value);
Console.WriteLine($"[Producer {producerId}] Added: {value}");
Thread.Sleep(Random.Shared.Next(50, 150));
}
Console.WriteLine($"[Producer {producerId}] Finished");
})
).ToArray();
// Чекаємо поки всі producers завершать роботу
Task.Run(async () =>
{
await Task.WhenAll(producers);
collection.CompleteAdding();
Console.WriteLine("[Coordinator] All producers finished, CompleteAdding called");
});
// 2 Consumers: обробляють числа
var consumers = Enumerable.Range(0, 2).Select(consumerId =>
Task.Run(() =>
{
foreach (int value in collection.GetConsumingEnumerable())
{
Console.WriteLine($"[Consumer {consumerId}] Processing: {value}");
Thread.Sleep(Random.Shared.Next(100, 200));
}
Console.WriteLine($"[Consumer {consumerId}] No more items");
})
).ToArray();
await Task.WhenAll(consumers);
Console.WriteLine("Pipeline completed!");
Вивід (приклад):
[Producer 0] Added: 0
[Producer 1] Added: 100
[Consumer 0] Processing: 0
[Producer 2] Added: 200
[Consumer 1] Processing: 100
[Producer 0] Added: 1
...
[Producer 2] Finished
[Coordinator] All producers finished, CompleteAdding called
[Consumer 0] Processing: 209
[Consumer 1] No more items
[Consumer 0] No more items
Pipeline completed!
✅ Використовуйте BlockingCollection коли:
❌ НЕ використовуйте BlockingCollection коли:
System.Threading.Channels (тема 15)ConcurrentQueue<T> простішийImmutable collections — колекції що ніколи не змінюються після створення. Будь-яка "модифікація" створює нову колекцію, залишаючи оригінал незмінним.
Чому це thread-safe?
Якщо ніхто не може змінити колекцію, то немає race conditions! Кілька потоків можуть безпечно читати одну і ту саму immutable колекцію без будь-якої синхронізації.
// Concurrent collections: thread-safe через locks/CAS
var concurrent = new ConcurrentDictionary<string, int>();
concurrent["key"] = 1; // Модифікує існуючу колекцію
// Immutable collections: thread-safe через immutability
var immutable = ImmutableDictionary<string, int>.Empty;
var newImmutable = immutable.Add("key", 1); // Створює НОВУ колекцію
// immutable досі порожній!
// newImmutable містить один елемент
Аналогія: Immutable колекція — це як фотографія. Ви не можете змінити фотографію, але можете зробити нову з додатковими елементами.
Immutable collections знаходяться у NuGet пакеті System.Collections.Immutable:
dotnet add package System.Collections.Immutable
using System.Collections.Immutable;
// Створення порожнього списку
var list1 = ImmutableList<int>.Empty;
Console.WriteLine($"list1.Count: {list1.Count}"); // 0
// Add створює НОВУ колекцію
var list2 = list1.Add(10);
Console.WriteLine($"list1.Count: {list1.Count}"); // 0 (не змінився!)
Console.WriteLine($"list2.Count: {list2.Count}"); // 1
// Можна ланцюжком
var list3 = list2.Add(20).Add(30).Add(40);
Console.WriteLine($"list3: [{string.Join(", ", list3)}]"); // [10, 20, 30, 40]
// Remove також створює нову колекцію
var list4 = list3.Remove(20);
Console.WriteLine($"list3: [{string.Join(", ", list3)}]"); // [10, 20, 30, 40] (не змінився)
Console.WriteLine($"list4: [{string.Join(", ", list4)}]"); // [10, 30, 40]
// Індексація (read-only)
Console.WriteLine($"list3[1]: {list3[1]}"); // 20
// SetItem — "змінити" елемент за індексом (створює нову колекцію)
var list5 = list3.SetItem(1, 999);
Console.WriteLine($"list3: [{string.Join(", ", list3)}]"); // [10, 20, 30, 40]
Console.WriteLine($"list5: [{string.Join(", ", list5)}]"); // [10, 999, 30, 40]
Важливо: Кожна операція створює нову колекцію. Це може здаватись неефективним, але під капотом використовується structural sharing — нові та старі версії спільно використовують більшість даних.
Immutable колекції використовують persistent data structures (зокрема AVL trees для списків та Hash Array Mapped Trie для словників).
Давайте детально розберемо як працює structural sharing на прикладі ImmutableList<T>.
Наївний підхід (НЕ так як працює насправді):
// Якби ImmutableList просто копіював весь масив:
var list1 = ImmutableList.Create(1, 2, 3, 4, 5); // [1, 2, 3, 4, 5]
var list2 = list1.Add(6); // Копіювання всіх 5 елементів + додавання 6
// Проблема: O(n) час та O(n) пам'ять для кожної операції!
Реальний підхід: AVL Tree з Structural Sharing
ImmutableList<T> внутрішньо представлений як збалансоване бінарне дерево (AVL tree), де кожен вузол містить елемент та посилання на лівого/правого нащадка.
Оригінальний список: [10, 20, 30]
Root
/ \
[10] Node
/ \
[20] [30]
Після Add(40): [10, 20, 30, 40]
NewRoot ← Новий корінь
/ \
[10] NewNode ← Новий вузол
/ \
[20] NewNode2 ← Новий вузол
/ \
[30] [40]
Спільні вузли: [10], [20], [30] — НЕ копіюються!
Створено тільки: NewRoot, NewNode, NewNode2
Чому це ефективно?
Детальний приклад з кількома версіями:
var v1 = ImmutableList<int>.Empty; // Версія 1: []
var v2 = v1.Add(10); // Версія 2: [10]
var v3 = v2.Add(20); // Версія 3: [10, 20]
var v4 = v3.Add(30); // Версія 4: [10, 20, 30]
// Всі 4 версії існують одночасно!
Console.WriteLine($"v1: [{string.Join(", ", v1)}]"); // []
Console.WriteLine($"v2: [{string.Join(", ", v2)}]"); // [10]
Console.WriteLine($"v3: [{string.Join(", ", v3)}]"); // [10, 20]
Console.WriteLine($"v4: [{string.Join(", ", v4)}]"); // [10, 20, 30]
Візуалізація пам'яті (спрощено):
v1 → null (порожній)
v2 → Node(10)
v3 → Node(20)
↓
Node(10) ← Спільний з v2!
v4 → Node(30)
↓
Node(20) ← Спільний з v3!
↓
Node(10) ← Спільний з v2 та v3!
Пам'ять: Замість 6 елементів (0 + 1 + 2 + 3), зберігаємо тільки 3 унікальні вузли!
Візуалізація через Mermaid:
Що відбувається при Remove?
var list = ImmutableList.Create(10, 20, 30, 40);
var newList = list.Remove(20); // [10, 30, 40]
// Створюється нова структура дерева БЕЗ вузла 20
// Вузли 10, 30, 40 можуть бути спільними (залежно від балансування)
Hash Array Mapped Trie (HAMT) для ImmutableDictionary
ImmutableDictionary<K,V> використовує ще складнішу структуру — HAMT (Hash Array Mapped Trie). Це дерево де:
Hash code: 0x1A2B3C4D
││││││││
││││││└┴─ Рівень 1: індекс 0x4D (77)
││││└┴─── Рівень 2: індекс 0x3C (60)
││└┴───── Рівень 3: індекс 0x2B (43)
└┴─────── Рівень 4: індекс 0x1A (26)
Чому O(log₃₂ n)?
Кожен вузол HAMT має 32 дочірні вузли (5 біт hash code). Для 1 мільйона елементів:
log₃₂(1,000,000) ≈ 4 рівні
Тобто максимум 4 переходи для будь-якої операції!
Складність операцій:
Add, Remove, SetItem: O(log n) (не O(1) як у List<T>, але не O(n)!)[i]: O(log n) (не O(1)!)foreach: O(n) (як звичайний список)Якщо потрібно виконати багато модифікацій підряд, використовуйте Builder щоб уникнути створення проміжних колекцій:
using System.Collections.Immutable;
using System.Diagnostics;
// ❌ ПОВІЛЬНО: кожен Add створює нову колекцію
var sw = Stopwatch.StartNew();
var list = ImmutableList<int>.Empty;
for (int i = 0; i < 10000; i++)
{
list = list.Add(i); // 10,000 нових колекцій!
}
sw.Stop();
Console.WriteLine($"Without builder: {sw.ElapsedMilliseconds}ms"); // ~150ms
// ✅ ШВИДКО: Builder накопичує зміни, потім створює фінальну колекцію
sw.Restart();
var builder = ImmutableList.CreateBuilder<int>();
for (int i = 0; i < 10000; i++)
{
builder.Add(i); // Модифікує mutable builder
}
var finalList = builder.ToImmutable(); // Одна immutable колекція
sw.Stop();
Console.WriteLine($"With builder: {sw.ElapsedMilliseconds}ms"); // ~5ms (в 30x швидше!)
Pattern: Використовуйте Builder для batch операцій, потім конвертуйте у immutable:
var builder = ImmutableList.CreateBuilder<string>();
builder.Add("A");
builder.AddRange(new[] { "B", "C", "D" });
builder.Remove("C");
var immutableList = builder.ToImmutable(); // Фінальна immutable версія
using System.Collections.Immutable;
// Створення порожнього словника
var dict1 = ImmutableDictionary<string, int>.Empty;
// Add створює нову колекцію
var dict2 = dict1.Add("apple", 10);
var dict3 = dict2.Add("banana", 20).Add("cherry", 30);
Console.WriteLine($"dict1.Count: {dict1.Count}"); // 0
Console.WriteLine($"dict3.Count: {dict3.Count}"); // 3
// Індексація (read-only)
Console.WriteLine($"dict3[\"banana\"]: {dict3["banana"]}"); // 20
// TryGetValue
if (dict3.TryGetValue("apple", out int value))
{
Console.WriteLine($"Found: {value}"); // 10
}
// SetItem — "змінити" значення (створює нову колекцію)
var dict4 = dict3.SetItem("banana", 999);
Console.WriteLine($"dict3[\"banana\"]: {dict3["banana"]}"); // 20 (не змінився)
Console.WriteLine($"dict4[\"banana\"]: {dict4["banana"]}"); // 999
// Remove
var dict5 = dict4.Remove("cherry");
Console.WriteLine($"dict4.Count: {dict4.Count}"); // 3
Console.WriteLine($"dict5.Count: {dict5.Count}"); // 2
// SetItems — batch update
var updates = new Dictionary<string, int>
{
["apple"] = 100,
["banana"] = 200
};
var dict6 = dict5.SetItems(updates);
Console.WriteLine($"dict6[\"apple\"]: {dict6["apple"]}"); // 100
using System.Collections.Immutable;
// ImmutableStack — LIFO
var stack1 = ImmutableStack<int>.Empty;
var stack2 = stack1.Push(10).Push(20).Push(30);
var stack3 = stack2.Pop(out int top);
Console.WriteLine($"Top: {top}"); // 30
Console.WriteLine($"stack2.Peek(): {stack2.Peek()}"); // 30 (не змінився)
Console.WriteLine($"stack3.Peek(): {stack3.Peek()}"); // 20
// ImmutableQueue — FIFO
var queue1 = ImmutableQueue<string>.Empty;
var queue2 = queue1.Enqueue("A").Enqueue("B").Enqueue("C");
var queue3 = queue2.Dequeue(out string first);
Console.WriteLine($"First: {first}"); // A
Console.WriteLine($"queue2.Peek(): {queue2.Peek()}"); // A (не змінився)
Console.WriteLine($"queue3.Peek(): {queue3.Peek()}"); // B
Immutable колекції також включають множини (sets) для зберігання унікальних елементів.
using System.Collections.Immutable;
// ImmutableHashSet — неупорядкована множина
var set1 = ImmutableHashSet<string>.Empty;
var set2 = set1.Add("apple").Add("banana").Add("cherry");
var set3 = set2.Add("apple"); // Дублікат — не додається
Console.WriteLine($"set2.Count: {set2.Count}"); // 3
Console.WriteLine($"set3.Count: {set3.Count}"); // 3 (не змінився)
// Contains — перевірка наявності
Console.WriteLine($"Contains 'banana': {set3.Contains("banana")}"); // true
// Union, Intersect, Except — операції над множинами
var setA = ImmutableHashSet.Create("a", "b", "c");
var setB = ImmutableHashSet.Create("b", "c", "d");
var union = setA.Union(setB); // [a, b, c, d]
var intersect = setA.Intersect(setB); // [b, c]
var except = setA.Except(setB); // [a]
Console.WriteLine($"Union: [{string.Join(", ", union)}]");
Console.WriteLine($"Intersect: [{string.Join(", ", intersect)}]");
Console.WriteLine($"Except: [{string.Join(", ", except)}]");
// ImmutableSortedSet — упорядкована множина
var sorted1 = ImmutableSortedSet<int>.Empty;
var sorted2 = sorted1.Add(30).Add(10).Add(20);
Console.WriteLine($"Sorted: [{string.Join(", ", sorted2)}]"); // [10, 20, 30]
Коли використовувати:
ImmutableHashSet<T> — коли потрібна швидка перевірка наявності (O(log n)) та порядок не важливийImmutableSortedSet<T> — коли потрібен відсортований порядок елементівImmutableArray<T> — це struct (не class!) що обгортає звичайний масив. На відміну від інших immutable колекцій, він НЕ використовує structural sharing.
using System.Collections.Immutable;
// Створення з елементів
var array1 = ImmutableArray.Create(1, 2, 3, 4, 5);
// Індексація — O(1) (швидше ніж ImmutableList!)
Console.WriteLine($"array1[2]: {array1[2]}"); // 3
// "Модифікація" створює НОВИЙ масив (копіює всі елементи!)
var array2 = array1.Add(6); // O(n) — копіює весь масив!
// SetItem — також O(n)
var array3 = array1.SetItem(2, 999);
Console.WriteLine($"array1: [{string.Join(", ", array1)}]"); // [1, 2, 3, 4, 5]
Console.WriteLine($"array3: [{string.Join(", ", array3)}]"); // [1, 2, 999, 4, 5]
Коли використовувати ImmutableArray:
Коли НЕ використовувати:
ImmutableList<T> (O(log n) замість O(n))Builder для ImmutableArray:
var builder = ImmutableArray.CreateBuilder<int>();
for (int i = 0; i < 1000; i++)
{
builder.Add(i); // Модифікує mutable builder
}
var array = builder.ToImmutable(); // Одна immutable версія
using System.Collections.Immutable;
public class AppConfiguration
{
// Immutable словник — thread-safe для читання
private ImmutableDictionary<string, string> _settings;
public AppConfiguration()
{
_settings = ImmutableDictionary<string, string>.Empty;
}
// Читання — thread-safe без locks
public string? GetSetting(string key)
{
_settings.TryGetValue(key, out string? value);
return value;
}
// Запис — атомарна заміна через Interlocked
public void SetSetting(string key, string value)
{
ImmutableDictionary<string, string> original, updated;
do
{
original = _settings;
updated = original.SetItem(key, value);
}
while (Interlocked.CompareExchange(ref _settings, updated, original) != original);
}
// Batch update
public void SetSettings(Dictionary<string, string> updates)
{
ImmutableDictionary<string, string> original, updated;
do
{
original = _settings;
updated = original.SetItems(updates);
}
while (Interlocked.CompareExchange(ref _settings, updated, original) != original);
}
public ImmutableDictionary<string, string> GetAllSettings()
{
return _settings; // Безпечно повертати — immutable!
}
}
// Використання:
var config = new AppConfiguration();
// 100 потоків одночасно читають та пишуть
Parallel.For(0, 100, i =>
{
// Запис
config.SetSetting($"key-{i % 10}", $"value-{i}");
// Читання (без locks!)
string? value = config.GetSetting($"key-{i % 10}");
Console.WriteLine($"[Thread {i}] Read: {value}");
});
// Отримати snapshot всіх налаштувань
var snapshot = config.GetAllSettings();
Console.WriteLine($"Total settings: {snapshot.Count}");
Чому це працює без locks для читання?
_settings — це reference на immutable колекцію_settings, ми отримаємо або стару або нову версію (обидві валідні)ConcurrentDictionary
Переваги:
GetOrAdd, AddOrUpdate)Недоліки:
Use cases:
ImmutableDictionary
Переваги:
Недоліки:
Use cases:
| Критерій | Concurrent | Immutable |
|---|---|---|
| Читання | O(1), lock-free | O(log n), абсолютно безпечне |
| Запис | O(1), striped locking | O(log n), CAS loop |
| GC Pressure | Низький | Середній (structural sharing) |
| Ітерація під час модифікації | Snapshot (може бути застарілим) | Завжди консистентний snapshot |
| API складність | Середня | Проста (functional style) |
| Best for | High-throughput mutations | Read-heavy, snapshots |
Давайте порівняємо продуктивність різних підходів до thread-safe колекцій на реальних сценаріях.
Сценарій: 8 потоків одночасно додають по 10,000 елементів у словник.
// Dictionary + lock
var dict = new Dictionary<string, int>();
var lockObj = new object();
Parallel.For(0, 80000, i =>
{
lock (lockObj)
{
dict[$"key-{i}"] = i;
}
});
// ConcurrentDictionary
var concurrent = new ConcurrentDictionary<string, int>();
Parallel.For(0, 80000, i =>
{
concurrent[$"key-{i}"] = i;
});
// ImmutableDictionary
var immutable = ImmutableDictionary<string, int>.Empty;
Parallel.For(0, 80000, i =>
{
ImmutableDictionary<string, int> original, updated;
do
{
original = immutable;
updated = original.SetItem($"key-{i}", i);
}
while (Interlocked.CompareExchange(ref immutable, updated, original) != original);
});
Результати:
┌─────────────────────────┬──────────┬────────────┐
│ Підхід │ Час (ms) │ Відносно │
├─────────────────────────┼──────────┼────────────┤
│ Dictionary + lock │ 3,500 │ 1.0x │
│ ConcurrentDictionary │ 1,200 │ 2.9x швидше│
│ ImmutableDictionary │ 8,900 │ 0.4x │
└─────────────────────────┴──────────┴────────────┘
Висновок: Для частих конкурентних записів ConcurrentDictionary найкращий вибір.
Сценарій: 8 потоків виконують 90% операцій читання та 10% запису.
┌─────────────────────────┬──────────┬────────────┐
│ Підхід │ Час (ms) │ Відносно │
├─────────────────────────┼──────────┼────────────┤
│ Dictionary + lock │ 2,100 │ 1.0x │
│ ConcurrentDictionary │ 850 │ 2.5x швидше│
│ ImmutableDictionary │ 650 │ 3.2x швидше│
└─────────────────────────┴──────────┴────────────┘
Висновок: Для read-heavy сценаріїв ImmutableDictionary найшвидший (читання без locks!).
Сценарій: Один потік додає 100,000 елементів, інший забирає.
┌─────────────────────────┬──────────┬────────────┐
│ Підхід │ Час (ms) │ Відносно │
├─────────────────────────┼──────────┼────────────┤
│ Queue + lock │ 450 │ 1.0x │
│ ConcurrentQueue │ 180 │ 2.5x швидше│
│ BlockingCollection │ 220 │ 2.0x швидше│
└─────────────────────────┴──────────┴────────────┘
Висновок: ConcurrentQueue найшвидша для простого producer-consumer. BlockingCollection трохи повільніша, але надає bounded capacity та блокуючу семантику.
Сценарій: 8 потоків обробляють 10,000 елементів кожен та додають результати.
┌─────────────────────────┬──────────┬────────────┐
│ Підхід │ Час (ms) │ Відносно │
├─────────────────────────┼──────────┼────────────┤
│ List + lock │ 520 │ 1.0x │
│ ConcurrentQueue │ 280 │ 1.9x швидше│
│ ConcurrentBag │ 190 │ 2.7x швидше│
└─────────────────────────┴──────────┴────────────┘
Висновок: ConcurrentBag найкращий для Parallel.ForEach завдяки thread-local optimization.
Сценарій: Пам'ять для зберігання 100,000 елементів.
┌─────────────────────────┬──────────┬────────────┐
│ Колекція │ Пам'ять │ Відносно │
├─────────────────────────┼──────────┼────────────┤
│ Dictionary<K,V> │ 8.2 MB │ 1.0x │
│ ConcurrentDictionary │ 12.5 MB │ 1.5x більше│
│ ImmutableDictionary │ 15.8 MB │ 1.9x більше│
│ List<T> │ 4.1 MB │ 1.0x │
│ ImmutableList<T> │ 8.9 MB │ 2.2x більше│
│ ImmutableArray<T> │ 4.1 MB │ 1.0x │
└─────────────────────────┴──────────┴────────────┘
Висновок: Concurrent та Immutable колекції використовують більше пам'яті через додаткові структури (locks, tree nodes).
ConcurrentDictionary, ConcurrentQueueImmutableDictionary, ImmutableListConcurrentQueue (unbounded) або BlockingCollection (bounded)ConcurrentBagImmutableArrayОсь повний огляд всіх thread-safe колекцій у .NET з їх характеристиками:
| Колекція | Тип | Порядок | Операції | Складність | Use Case |
|---|---|---|---|---|---|
| Concurrent Collections | |||||
ConcurrentDictionary<K,V> | Dictionary | Немає | TryAdd, TryRemove, GetOrAdd, AddOrUpdate | O(1) avg | Часті конкурентні read/write |
ConcurrentQueue<T> | Queue | FIFO | Enqueue, TryDequeue | O(1) | Producer-consumer, unbounded |
ConcurrentStack<T> | Stack | LIFO | Push, TryPop | O(1) | LIFO сценарії |
ConcurrentBag<T> | Bag | Немає | Add, TryTake | O(1) local | Parallel.ForEach агрегація |
BlockingCollection<T> | Queue | FIFO | Add, Take, CompleteAdding | O(1) | Bounded producer-consumer |
| Immutable Collections | |||||
ImmutableList<T> | List | Insertion | Add, Remove, SetItem | O(log n) | Read-heavy, snapshots |
ImmutableDictionary<K,V> | Dictionary | Немає | Add, Remove, SetItem | O(log n) | Read-heavy config, snapshots |
ImmutableHashSet<T> | Set | Немає | Add, Remove, Contains | O(log n) | Унікальні елементи, snapshots |
ImmutableSortedSet<T> | Set | Sorted | Add, Remove, Contains | O(log n) | Відсортовані унікальні елементи |
ImmutableStack<T> | Stack | LIFO | Push, Pop | O(1) | Functional LIFO |
ImmutableQueue<T> | Queue | FIFO | Enqueue, Dequeue | O(1) | Functional FIFO |
ImmutableArray<T> | Array | Index | [i], SetItem | O(1) read, O(n) write | Рідкісні зміни, швидкий доступ |
Давайте детально порівняємо два найпопулярніші thread-safe словники:
ConcurrentDictionary<K,V>:
var dict = new ConcurrentDictionary<string, int>();
// ✅ Швидкий запис (O(1) avg, striped locking)
dict["key"] = 100;
// ✅ Atomic операції
dict.GetOrAdd("key", k => ExpensiveComputation());
dict.AddOrUpdate("key", 1, (k, v) => v + 1);
// ⚠️ Ітерація може бачити застарілі дані
foreach (var kvp in dict) // Snapshot на момент початку
{
// Інші потоки можуть змінювати dict під час ітерації
}
// ❌ Не можна отримати "заморожений" snapshot
var snapshot = dict; // Це той самий об'єкт, не копія!
ImmutableDictionary<K,V>:
var dict = ImmutableDictionary<string, int>.Empty;
// ⚠️ Повільніший запис (O(log n), CAS loop)
ImmutableDictionary<string, int> original, updated;
do
{
original = dict;
updated = original.SetItem("key", 100);
}
while (Interlocked.CompareExchange(ref dict, updated, original) != original);
// ❌ Немає вбудованих atomic операцій (потрібен CAS loop)
// ✅ Ітерація завжди консистентна
foreach (var kvp in dict)
{
// Ітеруємо immutable snapshot, ніхто не може його змінити
}
// ✅ Можна отримати справжній snapshot
var snapshot = dict; // Це immutable snapshot, безпечно передавати
Коли що використовувати:
Використовуйте ConcurrentDictionary
GetOrAdd, AddOrUpdate)Приклад: Кеш з частими оновленнями, лічильники, session storage
Використовуйте ImmutableDictionary
Приклад: Application configuration, feature flags, read-only reference data
Завдання: Реалізуйте thread-safe кеш для URL з підтримкою TTL (time-to-live).
Вимоги:
ConcurrentDictionary<string, CacheEntry>Get(url) повертає значення або null якщо expiredSet(url, content, ttl) додає/оновлює записCleanupExpired() видаляє всі expired записиШаблон:
using System;
using System.Collections.Concurrent;
public class UrlCache
{
private record CacheEntry(string Content, DateTime ExpiresAt);
private readonly ConcurrentDictionary<string, CacheEntry> _cache = new();
public string? Get(string url)
{
// TODO: Реалізуйте
throw new NotImplementedException();
}
public void Set(string url, string content, TimeSpan ttl)
{
// TODO: Реалізуйте
throw new NotImplementedException();
}
public int CleanupExpired()
{
// TODO: Видаліть всі expired записи, поверніть кількість видалених
throw new NotImplementedException();
}
}
// Тест:
var cache = new UrlCache();
Parallel.For(0, 100, i =>
{
string url = $"https://example.com/page-{i % 10}";
cache.Set(url, $"Content-{i}", TimeSpan.FromSeconds(2));
string? content = cache.Get(url);
Console.WriteLine($"[Thread {i}] {url}: {content}");
});
Thread.Sleep(3000); // Чекаємо expiration
int cleaned = cache.CleanupExpired();
Console.WriteLine($"Cleaned {cleaned} expired entries");
Підказка: Використовуйте TryGetValue + перевірка ExpiresAt, TryRemove для cleanup.
Завдання: Реалізуйте систему збору логів з кількох джерел у один файл.
Вимоги:
BlockingCollection<LogEntry> з bounded capacityCompleteAdding()Шаблон:
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
public record LogEntry(DateTime Timestamp, string Source, string Message);
public class LogAggregator : IDisposable
{
private readonly BlockingCollection<LogEntry> _logQueue;
private readonly Task _writerTask;
private readonly string _logFilePath;
public LogAggregator(string logFilePath, int queueCapacity = 100)
{
_logFilePath = logFilePath;
_logQueue = new BlockingCollection<LogEntry>(queueCapacity);
_writerTask = Task.Run(ProcessLogs);
}
public void Log(string source, string message)
{
// TODO: Додайте LogEntry у чергу
throw new NotImplementedException();
}
private void ProcessLogs()
{
// TODO: Обробляйте логи з GetConsumingEnumerable та пишіть у файл
throw new NotImplementedException();
}
public void Dispose()
{
// TODO: CompleteAdding, дочекайтесь завершення _writerTask
throw new NotImplementedException();
}
}
// Тест:
using var aggregator = new LogAggregator("app.log");
// 5 producers генерують логи
var producers = Enumerable.Range(0, 5).Select(id =>
Task.Run(() =>
{
for (int i = 0; i < 20; i++)
{
aggregator.Log($"Source-{id}", $"Message {i}");
Thread.Sleep(Random.Shared.Next(10, 50));
}
})
).ToArray();
await Task.WhenAll(producers);
// Dispose викличе CompleteAdding та дочекається запису всіх логів
Підказка: У ProcessLogs використовуйте GetConsumingEnumerable() та File.AppendAllText() або StreamWriter.
Завдання: Реалізуйте багатопотоковий web crawler що обходить сайт та збирає всі унікальні URL.
Вимоги:
ConcurrentDictionary<string, bool> для відстеження відвіданих URLConcurrentQueue<string> для черги URL що потрібно обробитиШаблон:
using System;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
public class WebCrawler
{
private readonly ConcurrentDictionary<string, bool> _visited = new();
private readonly ConcurrentQueue<(string Url, int Depth)> _queue = new();
private readonly int _maxDepth;
private readonly int _workerCount;
private int _activeWorkers;
public WebCrawler(int maxDepth = 3, int workerCount = 4)
{
_maxDepth = maxDepth;
_workerCount = workerCount;
}
public async Task<List<string>> CrawlAsync(string startUrl)
{
_queue.Enqueue((startUrl, 0));
_activeWorkers = _workerCount;
var workers = Enumerable.Range(0, _workerCount)
.Select(id => Task.Run(() => WorkerLoop(id)))
.ToArray();
await Task.WhenAll(workers);
return _visited.Keys.ToList();
}
private void WorkerLoop(int workerId)
{
while (true)
{
if (_queue.TryDequeue(out var item))
{
var (url, depth) = item;
// TODO: Перевірте чи URL вже відвіданий через TryAdd
// TODO: Якщо depth < _maxDepth, "завантажте" сторінку та додайте нові URL у чергу
// TODO: Симулюйте завантаження через Thread.Sleep(100)
Console.WriteLine($"[Worker {workerId}] Crawled: {url} (depth {depth})");
}
else
{
// Черга порожня — перевіряємо чи всі workers idle
if (Interlocked.Decrement(ref _activeWorkers) == 0)
{
// Всі workers idle та черга порожня → завершуємо
break;
}
// Чекаємо трохи, можливо інший worker додасть URL
Thread.Sleep(50);
Interlocked.Increment(ref _activeWorkers);
}
}
}
private List<string> ExtractLinks(string url)
{
// Симуляція парсингу HTML та витягування посилань
var baseUrl = new Uri(url).GetLeftPart(UriPartial.Authority);
return Enumerable.Range(1, Random.Shared.Next(2, 5))
.Select(i => $"{baseUrl}/page-{Random.Shared.Next(1, 20)}")
.ToList();
}
}
// Тест:
var crawler = new WebCrawler(maxDepth: 2, workerCount: 3);
var visitedUrls = await crawler.CrawlAsync("https://example.com");
Console.WriteLine($"\nTotal URLs crawled: {visitedUrls.Count}");
foreach (var url in visitedUrls.Take(10))
{
Console.WriteLine($" - {url}");
}
Підказка:
_visited.TryAdd(url, true) для перевірки чи URL вже відвіданийTryAdd повертає false — URL вже обробляється іншим потокомУ цій темі ви дізналися про:
Concurrent Collections:
ConcurrentDictionary<K,V> — striped locking для високої throughputConcurrentQueue<T>, ConcurrentStack<T>, ConcurrentBag<T> — lock-free колекціїBlockingCollection<T> — bounded producer-consumer з блокуючими операціямиImmutable Collections:
Ключові принципи:
Наступна тема: TPL, Parallel та PLINQ — Task Parallel Library, data parallelism та PLINQ для паралельної обробки даних.
ThreadPool — Просунуті Сценарії та Внутрішня Будова
Глибокий розбір внутрішньої архітектури ThreadPool — Work Stealing Algorithm, Hill Climbing детально, IOCP механізм, custom ThreadPool реалізація, production tuning та advanced patterns для оптимізації багатопотокових застосунків.
TPL, Task та Композиція — Від Thread до Task
Глибокий академічний розбір Task Parallel Library — еволюція від Thread до Task, Task<T>, композиція через WhenAll/WhenAny, CancellationToken, exception handling та AggregateException. Теорія і практика сучасного паралелізму.