Це продовження статті «Конкурентність та Блокування». Читайте послідовно.
Дедлок — ситуація коли дві або більше транзакцій чекають одна на одну: A чекає ресурс що тримає B, а B чекає ресурс що тримає A. Жодна не може продовжити — вони заблоковані назавжди.
База даних виявляє дедлоки через wait-for graph (граф очікування). Якщо у графі є цикл — дедлок. СУБД обирає «жертву» (зазвичай ту транзакцію що виконала менше роботи або дешевша у відкаті) і відкочує її з помилкою.
Транзакція A: оновлює Order #1, потім хоче Customer #1
Транзакція B: оновлює Customer #1, потім хоче Order #1
-- Крок 1:
Tx A → LOCK ORDER #1 (EXCLUSIVE)
Tx B → LOCK CUSTOMER #1 (EXCLUSIVE)
-- Крок 2:
Tx A → хоче LOCK CUSTOMER #1 → ЧЕКАЄ (B тримає)
Tx B → хоче LOCK ORDER #1 → ЧЕКАЄ (A тримає)
-- Дедлок! Граф: A → B → A (цикл)
-- БД вибирає жертву, відкочує її транзакцію:
-- Victim: Tx B → SqlException 1205 (Deadlock victim)
У EF Core це проявляється як:
catch (DbUpdateException ex)
when (ex.InnerException is SqlException sqlEx && sqlEx.Number == 1205)
{
// Deadlock! Транзакцію відкочено
throw new RetryableException("Deadlock detected");
}
// PostgreSQL: PostgresException.SqlState == "40P01"
Найефективніший спосіб уникнути дедлоків — завжди звертатись до ресурсів в одному і тому ж порядку у всіх транзакціях:
// ❌ НЕБЕЗПЕЧНО: Tx A блокує Order потім Customer, Tx B навпаки
// Service A:
await UpdateOrderAsync(orderId);
await UpdateCustomerAsync(customerId);
// Service B (в іншому запиті, паралельно):
await UpdateCustomerAsync(customerId);
await UpdateOrderAsync(orderId);
// Ризик дедлоку при одночасному виконанні!
// ✅ БЕЗПЕЧНО: завжди один порядок (менший Id першим)
private async Task UpdateOrderAndCustomerAsync(int orderId, int customerId)
{
// Сортуємо за Id: менший Id блокується першим
if (orderId < customerId)
{
await UpdateOrderAsync(orderId);
await UpdateCustomerAsync(customerId);
}
else
{
await UpdateCustomerAsync(customerId);
await UpdateOrderAsync(orderId);
}
}
Чим довша транзакція — тим більше шансів на дедлок. Стратегія: відкладати бізнес-логіку до транзакції, а не всередині неї:
// ❌ ДОВГА транзакція: HTTP call і валідація всередині
await using var tx = await context.Database.BeginTransactionAsync();
var product = await context.Products.FindAsync(productId); // LOCK
var pricing = await externalPricingApi.GetPriceAsync(productId); // повільно!
var customer = await context.Customers.FindAsync(customerId); // LOCK
product.Price = pricing.CurrentPrice;
customer.LastOrderDate = DateTime.UtcNow;
await context.SaveChangesAsync();
await tx.CommitAsync();
// ✅ КОРОТКА транзакція: підготовка ДО транзакції
var pricing = await externalPricingApi.GetPriceAsync(productId); // зовні транзакції
var validatedData = await ValidateBusinessRulesAsync(dto); // зовні транзакції
// Сама транзакція — тільки запис, максимально коротка
await using var tx = await context.Database.BeginTransactionAsync();
var product = await context.Products.FindAsync(productId);
var customer = await context.Customers.FindAsync(customerId);
product.Price = pricing.CurrentPrice;
customer.LastOrderDate = validatedData.OrderDate;
await context.SaveChangesAsync();
await tx.CommitAsync();
SQL Server автоматично «ескалує» (підвищує рівень) блокувань коли їх стає занадто багато. При великій кількості рядкових блокувань (Row Locks) — сервер може підвищити до блокування сторінки (Page Lock), а потім до блокування всієї таблиці (Table Lock). Ескалація зменшує overhead від управління тисячами дрібних замків, але блокує більше даних.
-- Запобігання ескалації для конкретної таблиці:
ALTER TABLE Orders SET (LOCK_ESCALATION = DISABLE);
-- Або для запиту (тільки рядкові замки):
SELECT * FROM Orders WITH (ROWLOCK) WHERE CustomerId = @id
В EF Core через FromSqlRaw:
var orders = await context.Orders
.FromSqlRaw("SELECT * FROM Orders WITH (ROWLOCK) WHERE CustomerId = {0}", customerId)
.ToListAsync();
Один з найпотужніших шаблонів для конкурентного queue processing — SELECT FOR UPDATE SKIP LOCKED. Він вирішує задачу яка здавалася б простою: кілька worker-ів беруть завдання з черги не дублюючи і не блокуючи один одного.
// ❌ НАЇВНИЙ ПІДХІД: race condition
// Worker 1 і Worker 2 одночасно:
var job = await context.Jobs
.Where(j => j.Status == "Pending")
.OrderBy(j => j.CreatedAt)
.FirstOrDefaultAsync();
// Обидва отримали JOB #1 (той самий рядок!)
job!.Status = "Processing";
await context.SaveChangesAsync();
// Обидва намагаються оновити JOB #1 — один переможе, але обидва виконають завдання!
SELECT FOR UPDATE SKIP LOCKED — при читанні блокує рядок і пропускає вже заблоковані рядки. Кожен worker отримує унікальне завдання:
// PostgreSQL: FOR UPDATE SKIP LOCKED
public async Task<Job?> ClaimNextJobAsync(CancellationToken ct)
{
// Транзакція обов'язкова — блокування тримається до commit
await using var tx = await context.Database.BeginTransactionAsync();
// Беремо і блокуємо перший Pending який ніхто ще не бере
var job = await context.Jobs
.FromSqlRaw(@"
SELECT * FROM ""Jobs""
WHERE ""Status"" = 'Pending'
ORDER BY ""CreatedAt""
LIMIT 1
FOR UPDATE SKIP LOCKED")
.FirstOrDefaultAsync(ct);
if (job is null)
{
await tx.RollbackAsync(ct);
return null;
}
// Позначаємо як Processing (тепер безпечно — рядок заблоковано для нас)
job.Status = "Processing";
job.StartedAt = DateTime.UtcNow;
job.WorkerId = Environment.MachineName;
await context.SaveChangesAsync(ct);
await tx.CommitAsync(ct); // ← Блокування знімається
return job;
}
// SQL Server: еквівалент через UPDLOCK + READPAST
var job = await context.Jobs
.FromSqlRaw(@"
SELECT TOP 1 * FROM [Jobs] WITH (UPDLOCK, READPAST)
WHERE [Status] = 'Pending'
ORDER BY [CreatedAt]")
.FirstOrDefaultAsync(ct);
// READPAST = skip locked rows (аналог SKIP LOCKED)
public class JobQueueWorker : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<JobQueueWorker> _logger;
private readonly int _workerCount = 5; // Паралельні worker-и
public JobQueueWorker(IServiceScopeFactory scopeFactory, ILogger<JobQueueWorker> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Запускаємо N паралельних worker-ів
var workers = Enumerable.Range(1, _workerCount)
.Select(id => RunWorkerLoopAsync(id, stoppingToken));
await Task.WhenAll(workers);
}
private async Task RunWorkerLoopAsync(int workerId, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var job = await ClaimNextJobAsync(context, ct);
if (job is null)
{
// Нема завдань — чекаємо
await Task.Delay(TimeSpan.FromSeconds(3), ct);
continue;
}
_logger.LogInformation("Worker {WorkerId} processing Job {JobId}", workerId, job.Id);
await ProcessJobAsync(job, context, ct);
}
}
private async Task ProcessJobAsync(Job job, AppDbContext context, CancellationToken ct)
{
try
{
// Виконуємо роботу
await DoActualWorkAsync(job, ct);
job.Status = "Completed";
job.CompletedAt = DateTime.UtcNow;
await context.SaveChangesAsync(ct);
}
catch (Exception ex)
{
job.Status = "Failed";
job.ErrorMessage = ex.Message;
job.RetryCount++;
// Якщо retry limit не досягнуто — повернути у Pending
if (job.RetryCount < 3)
job.Status = "Pending";
await context.SaveChangesAsync(ct);
_logger.LogError(ex, "Job {JobId} failed (retry {Count})", job.Id, job.RetryCount);
}
}
private async Task DoActualWorkAsync(Job job, CancellationToken ct)
{
// Безпечне десеріалізування payload і виконання логіки
_logger.LogInformation("Processing job type {Type}", job.Type);
await Task.Delay(100, ct); // Симуляція роботи
}
}
Виявити і діагностувати concurrency проблеми у production — окремий навик. Розглянемо інструменти.
// Interceptor що логує запити довше порогу (з повним контекстом)
public class SlowQueryWithContextInterceptor : DbCommandInterceptor
{
private readonly ILogger _logger;
private readonly TimeSpan _threshold = TimeSpan.FromMilliseconds(1000);
public override DbDataReader ReaderExecuted(
DbCommand command,
CommandExecutedEventData eventData,
DbDataReader result)
{
if (eventData.Duration >= _threshold)
{
_logger.LogWarning(
"Slow query ({Duration}ms). Possible lock contention.\nSQL:\n{Sql}\nParameters: {Params}",
eventData.Duration.TotalMilliseconds,
command.CommandText,
string.Join(", ", command.Parameters.Cast<DbParameter>()
.Select(p => $"{p.ParameterName}={p.Value}")));
}
return base.ReaderExecuted(command, eventData, result);
}
}
-- Виявити заблоковані запити (sql що чекає блокування)
SELECT
r.session_id,
r.blocking_session_id,
r.wait_type,
r.wait_time / 1000.0 AS wait_seconds,
r.status,
t.text AS sql_text
FROM sys.dm_exec_requests r
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) t
WHERE r.blocking_session_id > 0
ORDER BY r.wait_time DESC;
-- Дедлоки: Extended Events або Trace Flag 1222
-- Увімкнути логування дедлоків:
EXEC sp_configure 'show advanced options', 1; RECONFIGURE;
-- Або через Extended Events (краще):
-- Подія: xml_deadlock_report
-- Активні lock waits у PostgreSQL
SELECT
a.pid,
a.query,
a.state,
a.wait_event_type,
a.wait_event,
a.query_start,
EXTRACT(EPOCH FROM (NOW() - a.query_start)) AS wait_seconds
FROM pg_stat_activity a
WHERE a.wait_event_type = 'Lock'
ORDER BY wait_seconds DESC;
-- Хто кого блокує:
SELECT
blocked.pid AS blocked_pid,
blocked.query AS blocked_query,
blocking.pid AS blocking_pid,
blocking.query AS blocking_query
FROM pg_stat_activity blocked
JOIN pg_stat_activity blocking ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
WHERE cardinality(pg_blocking_pids(blocked.pid)) > 0;
// Prometheus метрики для concurrency моніторингу
public class ConcurrencyMetricsInterceptor : DbCommandInterceptor
{
private static readonly Counter<int> DeadlockCounter =
new Meter("EfCore.Concurrency").CreateCounter<int>("deadlocks_total");
private static readonly Counter<int> LockTimeoutCounter =
new Meter("EfCore.Concurrency").CreateCounter<int>("lock_timeouts_total");
private static readonly Histogram<double> LockWaitHistogram =
new Meter("EfCore.Concurrency").CreateHistogram<double>(
"lock_wait_duration_ms", unit: "ms");
public override void CommandFailed(DbCommand command, CommandErrorEventData eventData)
{
if (eventData.Exception is SqlException sqlEx)
{
switch (sqlEx.Number)
{
case 1205:
DeadlockCounter.Add(1,
new KeyValuePair<string, object?>("table", ExtractTableName(command)));
break;
case 1222:
LockTimeoutCounter.Add(1);
break;
}
}
base.CommandFailed(command, eventData);
}
private static string ExtractTableName(DbCommand command)
{
// Простий парсинг SQL для виявлення таблиці (спрощено)
var match = Regex.Match(command.CommandText, @"FROM\s+\[?(\w+)\]?", RegexOptions.IgnoreCase);
return match.Success ? match.Groups[1].Value : "unknown";
}
}
// Rate limiting через атомарний UPDATE: не більше N дій за T хвилин
public async Task<bool> TryConsumeRateLimitAsync(string clientId, int limit, int windowMinutes)
{
var windowStart = DateTime.UtcNow.AddMinutes(-windowMinutes);
// Атомарний: прочитати і збільшити якщо ліміт не досягнуто
int updated = await context.RateLimitBuckets
.Where(b => b.ClientId == clientId
&& b.WindowStart >= windowStart
&& b.Count < limit)
.ExecuteUpdateAsync(s => s.SetProperty(b => b.Count, b => b.Count + 1));
if (updated > 0)
return true; // Успішно споживано
// Перевірити: window застаріло (треба новий bucket)?
var exists = await context.RateLimitBuckets
.AnyAsync(b => b.ClientId == clientId && b.WindowStart >= windowStart);
if (!exists)
{
// Новий window: вставити bucket
try
{
context.RateLimitBuckets.Add(new RateLimitBucket
{
ClientId = clientId,
WindowStart = DateTime.UtcNow,
Count = 1
});
await context.SaveChangesAsync();
return true;
}
catch (DbUpdateException)
{
// Race condition: хтось інший вже вставив → спробуємо ExecuteUpdate ще раз
return await context.RateLimitBuckets
.Where(b => b.ClientId == clientId
&& b.WindowStart >= windowStart
&& b.Count < limit)
.ExecuteUpdateAsync(s => s.SetProperty(b => b.Count, b => b.Count + 1)) > 0;
}
}
return false; // Ліміт вичерпано
}
// Безпечне оновлення Score без повного завантаження
public async Task AddScoreAsync(int playerId, int pointsToAdd)
{
// Атомарний UPDATE: score += N
await context.PlayerScores
.Where(s => s.PlayerId == playerId)
.ExecuteUpdateAsync(s => s
.SetProperty(ps => ps.TotalScore, ps => ps.TotalScore + pointsToAdd)
.SetProperty(ps => ps.LastUpdated, DateTime.UtcNow)
);
// SQL: UPDATE PlayerScores SET TotalScore = TotalScore + @N, LastUpdated = @now
// WHERE PlayerId = @id
// Атомарно — жодних race conditions при паралельних оновленнях!
}
Завдання 1.1: Deadlock симуляція та retry
Скопіюйте класичний deadlock:
SqlException.Number == 1205 або PostgresException.SqlState == "40P01"Завдання 1.2: SELECT FOR UPDATE SKIP LOCKED queue
Реалізуйте EmailQueue (Id, To, Subject, Body, Status, RetryCount):
EnqueueAsync(email) — додає зі статусом PendingClaimNextAsync() — FOR UPDATE SKIP LOCKED / UPDLOCK + READPAST, повертає Email або nullMarkSentAsync(id) / MarkFailedAsync(id, error)Task.WhenAll — жоден не обробляє один email двічіЗавдання 1.3: Lock monitoring запит
Напишіть ConcurrencyDiagnosticsService:
GetBlockingQueriesAsync() — виконує sys.dm_exec_requests (SQL Server) або pg_stat_activity (PostgreSQL) і повертає список заблокованих запитівGetDeadlockCountAsync(DateTime since) — з таблиці-журналу що ви заповнюєте через interceptorЗавдання 2.1: Consistent Lock Ordering
Для E-commerce Transfer: переміщення товарів між складами (Warehouse A → Warehouse B):
TransferInventoryAsync(int fromWarehouseId, int toWarehouseId, int productId, int qty)Завдання 2.2: Rate Limiting через атомарний UPDATE
Реалізуйте API rate limiter для конкретного userId:
ExecuteUpdateAsync де Count < LimitRateLimitExceededException(RetryAfterSeconds)Завдання 3.1: Distributed Semaphore через БД
Реалізуйте DbSemaphore — обмеження кількості паралельних операцій через БД:
public class DbSemaphore
{
// Максимум N одночасних «слотів» для ресурсу
Task<IAsyncDisposable> AcquireAsync(string resource, int maxConcurrent, TimeSpan timeout);
}
Таблиця SemaphoreSlots (Resource, AcquiredAt, WorkerId, ExpiresAt). При AcquireAsync:
SELECT COUNT(*) ... FOR UPDATE де resource = @resource та ExpiresAt > nowDispose() → DELETE слота. Expired слоти прибирає background job.
Ця стаття повністю розкрила конкурентність у EF Core:
Частина 1 — Феномени та Race Conditions:
ExecuteUpdateAsync з умовою або Optimistic Concurrency retryЧастина 2 — Дедлоки та Моніторинг:
LOCK_ESCALATION = DISABLESELECT FOR UPDATE SKIP LOCKED / UPDLOCK + READPAST: правильний queue processing без дублювання_workerCount паралельних workers, retry logicsys.dm_exec_requests, pg_stat_activity, custom Prometheus метрики через interceptorНаступна стаття — Interceptors та Middleware (стаття 23) — глибоке занурення у EF Core Interceptors: DbCommandInterceptor, SaveChangesInterceptor, IDbConnectionInterceptor, аудит, логування, метрики і динамічна модифікація запитів.
Конкурентність та Блокування (Частина 1)
Глибокий розбір конкурентності в EF Core — Read Phenomena (dirty read, non-repeatable read, phantom read), MVCC у PostgreSQL vs Lock-based у SQL Server, практичні сценарії race conditions та їх вирішення через правильні стратегії блокування.
Міграції в EF Core — Основи (Частина 1)
Що таке міграції і чому вони існують — від хаосу ручних SQL-скриптів до версійованої схеми БД. Анатомія файлу міграції, ModelSnapshot, команди dotnet ef. Альтернативи — DbUp, Flyway, Liquibase. Скасування міграцій.