Ef Core

Конкурентність — Дедлоки та Queue Processing (Частина 2)

Lock Escalation і дедлоки в EF Core — як вони виникають і як їх уникати. SELECT FOR UPDATE SKIP LOCKED для надійного queue processing. Monitoring конкурентних проблем у production. Concurrency Patterns для реальних систем.

Конкурентність: Дедлоки та Queue Processing

Це продовження статті «Конкурентність та Блокування». Читайте послідовно.


Дедлоки: коли транзакції чекають одна на одну

Дедлок — ситуація коли дві або більше транзакцій чекають одна на одну: A чекає ресурс що тримає B, а B чекає ресурс що тримає A. Жодна не може продовжити — вони заблоковані назавжди.

База даних виявляє дедлоки через wait-for graph (граф очікування). Якщо у графі є цикл — дедлок. СУБД обирає «жертву» (зазвичай ту транзакцію що виконала менше роботи або дешевша у відкаті) і відкочує її з помилкою.

Як виникає класичний дедлок

Транзакція A:  оновлює Order #1, потім хоче Customer #1
Транзакція B:  оновлює Customer #1, потім хоче Order #1

-- Крок 1:
Tx A → LOCK ORDER #1 (EXCLUSIVE)
Tx B → LOCK CUSTOMER #1 (EXCLUSIVE)

-- Крок 2:
Tx A → хоче LOCK CUSTOMER #1 → ЧЕКАЄ (B тримає)
Tx B → хоче LOCK ORDER #1    → ЧЕКАЄ (A тримає)

-- Дедлок! Граф: A → B → A (цикл)
-- БД вибирає жертву, відкочує її транзакцію:
--   Victim: Tx B → SqlException 1205 (Deadlock victim)

У EF Core це проявляється як:

catch (DbUpdateException ex)
    when (ex.InnerException is SqlException sqlEx && sqlEx.Number == 1205)
{
    // Deadlock! Транзакцію відкочено
    throw new RetryableException("Deadlock detected");
}
// PostgreSQL: PostgresException.SqlState == "40P01"

Правило уникнення дедлоків: consistent ordering

Найефективніший спосіб уникнути дедлоків — завжди звертатись до ресурсів в одному і тому ж порядку у всіх транзакціях:

// ❌ НЕБЕЗПЕЧНО: Tx A блокує Order потім Customer, Tx B навпаки
// Service A:
await UpdateOrderAsync(orderId);
await UpdateCustomerAsync(customerId);

// Service B (в іншому запиті, паралельно):
await UpdateCustomerAsync(customerId);
await UpdateOrderAsync(orderId);

// Ризик дедлоку при одночасному виконанні!

// ✅ БЕЗПЕЧНО: завжди один порядок (менший Id першим)
private async Task UpdateOrderAndCustomerAsync(int orderId, int customerId)
{
    // Сортуємо за Id: менший Id блокується першим
    if (orderId < customerId)
    {
        await UpdateOrderAsync(orderId);
        await UpdateCustomerAsync(customerId);
    }
    else
    {
        await UpdateCustomerAsync(customerId);
        await UpdateOrderAsync(orderId);
    }
}

Мінімізація часу утримання блокувань

Чим довша транзакція — тим більше шансів на дедлок. Стратегія: відкладати бізнес-логіку до транзакції, а не всередині неї:

// ❌ ДОВГА транзакція: HTTP call і валідація всередині
await using var tx = await context.Database.BeginTransactionAsync();

var product = await context.Products.FindAsync(productId); // LOCK
var pricing = await externalPricingApi.GetPriceAsync(productId); // повільно!
var customer = await context.Customers.FindAsync(customerId); // LOCK

product.Price = pricing.CurrentPrice;
customer.LastOrderDate = DateTime.UtcNow;
await context.SaveChangesAsync();
await tx.CommitAsync();

// ✅ КОРОТКА транзакція: підготовка ДО транзакції
var pricing = await externalPricingApi.GetPriceAsync(productId); // зовні транзакції
var validatedData = await ValidateBusinessRulesAsync(dto); // зовні транзакції

// Сама транзакція — тільки запис, максимально коротка
await using var tx = await context.Database.BeginTransactionAsync();
var product = await context.Products.FindAsync(productId);
var customer = await context.Customers.FindAsync(customerId);
product.Price = pricing.CurrentPrice;
customer.LastOrderDate = validatedData.OrderDate;
await context.SaveChangesAsync();
await tx.CommitAsync();

Lock Escalation у SQL Server

SQL Server автоматично «ескалує» (підвищує рівень) блокувань коли їх стає занадто багато. При великій кількості рядкових блокувань (Row Locks) — сервер може підвищити до блокування сторінки (Page Lock), а потім до блокування всієї таблиці (Table Lock). Ескалація зменшує overhead від управління тисячами дрібних замків, але блокує більше даних.

-- Запобігання ескалації для конкретної таблиці:
ALTER TABLE Orders SET (LOCK_ESCALATION = DISABLE);

-- Або для запиту (тільки рядкові замки):
SELECT * FROM Orders WITH (ROWLOCK) WHERE CustomerId = @id

В EF Core через FromSqlRaw:

var orders = await context.Orders
    .FromSqlRaw("SELECT * FROM Orders WITH (ROWLOCK) WHERE CustomerId = {0}", customerId)
    .ToListAsync();

SELECT FOR UPDATE SKIP LOCKED: queue processing

Один з найпотужніших шаблонів для конкурентного queue processing — SELECT FOR UPDATE SKIP LOCKED. Він вирішує задачу яка здавалася б простою: кілька worker-ів беруть завдання з черги не дублюючи і не блокуючи один одного.

Проблема naive queue processing

// ❌ НАЇВНИЙ ПІДХІД: race condition
// Worker 1 і Worker 2 одночасно:
var job = await context.Jobs
    .Where(j => j.Status == "Pending")
    .OrderBy(j => j.CreatedAt)
    .FirstOrDefaultAsync();
// Обидва отримали JOB #1 (той самий рядок!)

job!.Status = "Processing";
await context.SaveChangesAsync();
// Обидва намагаються оновити JOB #1 — один переможе, але обидва виконають завдання!

SKIP LOCKED: правильне рішення

SELECT FOR UPDATE SKIP LOCKED — при читанні блокує рядок і пропускає вже заблоковані рядки. Кожен worker отримує унікальне завдання:

// PostgreSQL: FOR UPDATE SKIP LOCKED
public async Task<Job?> ClaimNextJobAsync(CancellationToken ct)
{
    // Транзакція обов'язкова — блокування тримається до commit
    await using var tx = await context.Database.BeginTransactionAsync();

    // Беремо і блокуємо перший Pending який ніхто ще не бере
    var job = await context.Jobs
        .FromSqlRaw(@"
            SELECT * FROM ""Jobs""
            WHERE ""Status"" = 'Pending'
            ORDER BY ""CreatedAt""
            LIMIT 1
            FOR UPDATE SKIP LOCKED")
        .FirstOrDefaultAsync(ct);

    if (job is null)
    {
        await tx.RollbackAsync(ct);
        return null;
    }

    // Позначаємо як Processing (тепер безпечно — рядок заблоковано для нас)
    job.Status      = "Processing";
    job.StartedAt   = DateTime.UtcNow;
    job.WorkerId    = Environment.MachineName;

    await context.SaveChangesAsync(ct);
    await tx.CommitAsync(ct); // ← Блокування знімається

    return job;
}
// SQL Server: еквівалент через UPDLOCK + READPAST
var job = await context.Jobs
    .FromSqlRaw(@"
        SELECT TOP 1 * FROM [Jobs] WITH (UPDLOCK, READPAST)
        WHERE [Status] = 'Pending'
        ORDER BY [CreatedAt]")
    .FirstOrDefaultAsync(ct);
// READPAST = skip locked rows (аналог SKIP LOCKED)

Повний Queue Worker з SKIP LOCKED

public class JobQueueWorker : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<JobQueueWorker> _logger;
    private readonly int _workerCount = 5; // Паралельні worker-и

    public JobQueueWorker(IServiceScopeFactory scopeFactory, ILogger<JobQueueWorker> logger)
    {
        _scopeFactory = scopeFactory;
        _logger       = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Запускаємо N паралельних worker-ів
        var workers = Enumerable.Range(1, _workerCount)
            .Select(id => RunWorkerLoopAsync(id, stoppingToken));

        await Task.WhenAll(workers);
    }

    private async Task RunWorkerLoopAsync(int workerId, CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            using var scope   = _scopeFactory.CreateScope();
            var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();

            var job = await ClaimNextJobAsync(context, ct);

            if (job is null)
            {
                // Нема завдань — чекаємо
                await Task.Delay(TimeSpan.FromSeconds(3), ct);
                continue;
            }

            _logger.LogInformation("Worker {WorkerId} processing Job {JobId}", workerId, job.Id);

            await ProcessJobAsync(job, context, ct);
        }
    }

    private async Task ProcessJobAsync(Job job, AppDbContext context, CancellationToken ct)
    {
        try
        {
            // Виконуємо роботу
            await DoActualWorkAsync(job, ct);

            job.Status      = "Completed";
            job.CompletedAt = DateTime.UtcNow;
            await context.SaveChangesAsync(ct);
        }
        catch (Exception ex)
        {
            job.Status       = "Failed";
            job.ErrorMessage = ex.Message;
            job.RetryCount++;

            // Якщо retry limit не досягнуто — повернути у Pending
            if (job.RetryCount < 3)
                job.Status = "Pending";

            await context.SaveChangesAsync(ct);
            _logger.LogError(ex, "Job {JobId} failed (retry {Count})", job.Id, job.RetryCount);
        }
    }

    private async Task DoActualWorkAsync(Job job, CancellationToken ct)
    {
        // Безпечне десеріалізування payload і виконання логіки
        _logger.LogInformation("Processing job type {Type}", job.Type);
        await Task.Delay(100, ct); // Симуляція роботи
    }
}

Monitoring конкурентних проблем у production

Виявити і діагностувати concurrency проблеми у production — окремий навик. Розглянемо інструменти.

EF Core: логування повільних запитів і concurrent операцій

// Interceptor що логує запити довше порогу (з повним контекстом)
public class SlowQueryWithContextInterceptor : DbCommandInterceptor
{
    private readonly ILogger _logger;
    private readonly TimeSpan _threshold = TimeSpan.FromMilliseconds(1000);

    public override DbDataReader ReaderExecuted(
        DbCommand command,
        CommandExecutedEventData eventData,
        DbDataReader result)
    {
        if (eventData.Duration >= _threshold)
        {
            _logger.LogWarning(
                "Slow query ({Duration}ms). Possible lock contention.\nSQL:\n{Sql}\nParameters: {Params}",
                eventData.Duration.TotalMilliseconds,
                command.CommandText,
                string.Join(", ", command.Parameters.Cast<DbParameter>()
                    .Select(p => $"{p.ParameterName}={p.Value}")));
        }

        return base.ReaderExecuted(command, eventData, result);
    }
}

SQL Server: sys.dm_exec_requests і wait types

-- Виявити заблоковані запити (sql що чекає блокування)
SELECT
    r.session_id,
    r.blocking_session_id,
    r.wait_type,
    r.wait_time / 1000.0  AS wait_seconds,
    r.status,
    t.text                AS sql_text
FROM sys.dm_exec_requests r
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) t
WHERE r.blocking_session_id > 0
ORDER BY r.wait_time DESC;
-- Дедлоки: Extended Events або Trace Flag 1222
-- Увімкнути логування дедлоків:
EXEC sp_configure 'show advanced options', 1; RECONFIGURE;

-- Або через Extended Events (краще):
-- Подія: xml_deadlock_report

PostgreSQL: pg_stat_activity і lock monitoring

-- Активні lock waits у PostgreSQL
SELECT
    a.pid,
    a.query,
    a.state,
    a.wait_event_type,
    a.wait_event,
    a.query_start,
    EXTRACT(EPOCH FROM (NOW() - a.query_start)) AS wait_seconds
FROM pg_stat_activity a
WHERE a.wait_event_type = 'Lock'
ORDER BY wait_seconds DESC;

-- Хто кого блокує:
SELECT
    blocked.pid         AS blocked_pid,
    blocked.query       AS blocked_query,
    blocking.pid        AS blocking_pid,
    blocking.query      AS blocking_query
FROM pg_stat_activity blocked
JOIN pg_stat_activity blocking ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
WHERE cardinality(pg_blocking_pids(blocked.pid)) > 0;

Metrics: Concurrency-специфічні метрики

// Prometheus метрики для concurrency моніторингу
public class ConcurrencyMetricsInterceptor : DbCommandInterceptor
{
    private static readonly Counter<int> DeadlockCounter =
        new Meter("EfCore.Concurrency").CreateCounter<int>("deadlocks_total");

    private static readonly Counter<int> LockTimeoutCounter =
        new Meter("EfCore.Concurrency").CreateCounter<int>("lock_timeouts_total");

    private static readonly Histogram<double> LockWaitHistogram =
        new Meter("EfCore.Concurrency").CreateHistogram<double>(
            "lock_wait_duration_ms", unit: "ms");

    public override void CommandFailed(DbCommand command, CommandErrorEventData eventData)
    {
        if (eventData.Exception is SqlException sqlEx)
        {
            switch (sqlEx.Number)
            {
                case 1205:
                    DeadlockCounter.Add(1,
                        new KeyValuePair<string, object?>("table", ExtractTableName(command)));
                    break;
                case 1222:
                    LockTimeoutCounter.Add(1);
                    break;
            }
        }

        base.CommandFailed(command, eventData);
    }

    private static string ExtractTableName(DbCommand command)
    {
        // Простий парсинг SQL для виявлення таблиці (спрощено)
        var match = Regex.Match(command.CommandText, @"FROM\s+\[?(\w+)\]?", RegexOptions.IgnoreCase);
        return match.Success ? match.Groups[1].Value : "unknown";
    }
}

Архітектурні паттерни для конкурентних систем

Token Bucket для rate limiting на рівні БД

// Rate limiting через атомарний UPDATE: не більше N дій за T хвилин
public async Task<bool> TryConsumeRateLimitAsync(string clientId, int limit, int windowMinutes)
{
    var windowStart = DateTime.UtcNow.AddMinutes(-windowMinutes);

    // Атомарний: прочитати і збільшити якщо ліміт не досягнуто
    int updated = await context.RateLimitBuckets
        .Where(b => b.ClientId == clientId
                 && b.WindowStart >= windowStart
                 && b.Count < limit)
        .ExecuteUpdateAsync(s => s.SetProperty(b => b.Count, b => b.Count + 1));

    if (updated > 0)
        return true; // Успішно споживано

    // Перевірити: window застаріло (треба новий bucket)?
    var exists = await context.RateLimitBuckets
        .AnyAsync(b => b.ClientId == clientId && b.WindowStart >= windowStart);

    if (!exists)
    {
        // Новий window: вставити bucket
        try
        {
            context.RateLimitBuckets.Add(new RateLimitBucket
            {
                ClientId    = clientId,
                WindowStart = DateTime.UtcNow,
                Count       = 1
            });
            await context.SaveChangesAsync();
            return true;
        }
        catch (DbUpdateException)
        {
            // Race condition: хтось інший вже вставив → спробуємо ExecuteUpdate ще раз
            return await context.RateLimitBuckets
                .Where(b => b.ClientId == clientId
                         && b.WindowStart >= windowStart
                         && b.Count < limit)
                .ExecuteUpdateAsync(s => s.SetProperty(b => b.Count, b => b.Count + 1)) > 0;
        }
    }

    return false; // Ліміт вичерпано
}

Leaderboard з конкурентним оновленням рейтингу

// Безпечне оновлення Score без повного завантаження
public async Task AddScoreAsync(int playerId, int pointsToAdd)
{
    // Атомарний UPDATE: score += N
    await context.PlayerScores
        .Where(s => s.PlayerId == playerId)
        .ExecuteUpdateAsync(s => s
            .SetProperty(ps => ps.TotalScore, ps => ps.TotalScore + pointsToAdd)
            .SetProperty(ps => ps.LastUpdated, DateTime.UtcNow)
        );
    // SQL: UPDATE PlayerScores SET TotalScore = TotalScore + @N, LastUpdated = @now
    //      WHERE PlayerId = @id
    // Атомарно — жодних race conditions при паралельних оновленнях!
}

Практичні завдання (Частина 2)

Рівень 1 — Базовий

Завдання 1.1: Deadlock симуляція та retry

Скопіюйте класичний deadlock:

  1. Transaction A: UPDATE Orders WHERE Id=1, потім UPDATE Customers WHERE Id=1
  2. Transaction B (паралельно): UPDATE Customers WHERE Id=1, потім UPDATE Orders WHERE Id=1
  3. Виявте SqlException.Number == 1205 або PostgresException.SqlState == "40P01"
  4. Додайте retry з exponential backoff через Polly

Завдання 1.2: SELECT FOR UPDATE SKIP LOCKED queue

Реалізуйте EmailQueue (Id, To, Subject, Body, Status, RetryCount):

  1. EnqueueAsync(email) — додає зі статусом Pending
  2. ClaimNextAsync()FOR UPDATE SKIP LOCKED / UPDLOCK + READPAST, повертає Email або null
  3. MarkSentAsync(id) / MarkFailedAsync(id, error)
  4. Запустіть 3 паралельних worker через Task.WhenAll — жоден не обробляє один email двічі

Завдання 1.3: Lock monitoring запит

Напишіть ConcurrencyDiagnosticsService:

  • GetBlockingQueriesAsync() — виконує sys.dm_exec_requests (SQL Server) або pg_stat_activity (PostgreSQL) і повертає список заблокованих запитів
  • GetDeadlockCountAsync(DateTime since) — з таблиці-журналу що ви заповнюєте через interceptor

Рівень 2 — Логіка

Завдання 2.1: Consistent Lock Ordering

Для E-commerce Transfer: переміщення товарів між складами (Warehouse A → Warehouse B):

  1. Визначте правило: завжди блокувати склад з меншим Id першим
  2. Реалізуйте TransferInventoryAsync(int fromWarehouseId, int toWarehouseId, int productId, int qty)
  3. Напишіть стрес-тест: 20 паралельних переміщень між одними і тими ж складами (в обох напрямках) — дедлоків не має бути

Завдання 2.2: Rate Limiting через атомарний UPDATE

Реалізуйте API rate limiter для конкретного userId:

  • Ліміт: 100 запитів на 60 хвилин
  • Атомарний: ExecuteUpdateAsync де Count < Limit
  • При не-існуючому bucket: INSERT або ON CONFLICT DO UPDATE (PostgreSQL)
  • При досягненні ліміту: кидаємо RateLimitExceededException(RetryAfterSeconds)

Рівень 3 — Архітектура

Завдання 3.1: Distributed Semaphore через БД

Реалізуйте DbSemaphore — обмеження кількості паралельних операцій через БД:

public class DbSemaphore
{
    // Максимум N одночасних «слотів» для ресурсу
    Task<IAsyncDisposable> AcquireAsync(string resource, int maxConcurrent, TimeSpan timeout);
}

Таблиця SemaphoreSlots (Resource, AcquiredAt, WorkerId, ExpiresAt). При AcquireAsync:

  1. SELECT COUNT(*) ... FOR UPDATE де resource = @resource та ExpiresAt > now
  2. Якщо count < maxConcurrent → INSERT новий слот
  3. Інакше → чекати або timeout

Dispose() → DELETE слота. Expired слоти прибирає background job.


Підсумок статті 22

Ця стаття повністю розкрила конкурентність у EF Core:

Частина 1 — Феномени та Race Conditions:

  • Read Phenomena: Dirty, Non-Repeatable, Phantom Read, Write Skew — з реальними бізнес-наслідками
  • MVCC (PostgreSQL) vs Lock-based (SQL Server): філософії ізоляції, RCSI як компроміс
  • Check-Then-Act: Unique Index + DbUpdateException — єдиний надійний захист
  • Double Spending: атомарний ExecuteUpdateAsync з умовою або Optimistic Concurrency retry
  • Idempotency Key: архітектурний захист від network retry і подвійних запитів

Частина 2 — Дедлоки та Моніторинг:

  • Дедлоки: wait-for graph, SQL Error 1205 / SQLSTATE 40P01, Consistent Lock Ordering як профілактика
  • Мінімізація часу утримання блокувань — підготовка поза транзакцією
  • Lock Escalation у SQL Server: Row → Page → Table, LOCK_ESCALATION = DISABLE
  • SELECT FOR UPDATE SKIP LOCKED / UPDLOCK + READPAST: правильний queue processing без дублювання
  • Конкурентний Queue Worker: _workerCount паралельних workers, retry logic
  • Monitoring: sys.dm_exec_requests, pg_stat_activity, custom Prometheus метрики через interceptor
  • Архітектурні паттерни: Rate Limiting, Leaderboard Score через атомарний UPDATE

Наступна стаття — Interceptors та Middleware (стаття 23) — глибоке занурення у EF Core Interceptors: DbCommandInterceptor, SaveChangesInterceptor, IDbConnectionInterceptor, аудит, логування, метрики і динамічна модифікація запитів.


Додаткові ресурси