Ef Core

Збереження Даних — Concurrency та Outbox (Частина 2)

Optimistic Concurrency з ConcurrencyToken і RowVersion — виявлення конфліктів при одночасному записі. Pessimistic Locking через SELECT FOR UPDATE. Outbox Pattern для надійного запису разом із зовнішніми сервісами. Unit of Work.

Збереження Даних: Concurrency та Outbox Pattern

Це продовження статті «Збереження Даних та Транзакції». Читайте послідовно.


Проблема конкурентного запису

Уявіть: два менеджери одночасно відкривають картку одного і того ж товару і зменшують ціну. Менеджер A бачить ціну 10000 і встановлює 9000. Менеджер B теж бачить 10000 і встановлює 8500. Хто переможе? Той, чий запит виконається останнім — і зміна першого буде мовчки перезаписана.

Це класична Lost Update проблема: паралельні записи без координації призводять до втрати змін.

В базах даних є два підходи для вирішення:

Optimistic Concurrency — «оптимістичний»: ми вважаємо що конфлікти рідкісні. Читаємо дані без блокування, а при записі перевіряємо, чи хтось вже не змінив їх відтоді. Якщо так — видаємо помилку. Підхід для читання-інтенсивних додатків де конфлікти дійсно рідкісні.

Pessimistic Locking — «песимістичний»: ми вважаємо що конфлікти часті. Блокуємо рядок при читанні — ніхто інший не може його редагувати поки ми не завершимо. Підхід для систем з частими конкурентними записами до тих самих рядків.


Optimistic Concurrency: ConcurrencyToken

[ConcurrencyToken] або IsConcurrencyToken() позначає властивість як маркер конкурентності. EF Core включає її у WHERE умову UPDATE, щоб перевірити що рядок не змінився з моменту читання:

public class Product
{
    public int Id { get; set; }
    public string Name { get; set; } = string.Empty;
    public decimal Price { get; set; }
    public int Stock { get; set; }

    // ConcurrencyToken: будь-яка властивість що унікально ідентифікує версію
    [ConcurrencyToken]
    public Guid Version { get; set; } = Guid.NewGuid();
}
// Конфігурація через Fluent API:
builder.Property(p => p.Version).IsConcurrencyToken();

Коли EF Core генерує UPDATE для Product — він включає перевірку Version:

-- EF Core генерує:
UPDATE Products
SET Name = @name, Price = @price, Version = @newVersion
WHERE Id = @id AND Version = @originalVersion  -- ← перевірка!

-- Якщо між читанням і записом хтось змінив рядок (Version вже інша):
-- 0 рядків оновлено → DbUpdateConcurrencyException

Оновлення Version при кожній зміні

При оновленні — потрібно генерувати нову Version. Це можна зробити через SaveChangesAsync override:

public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
    // Оновити Version для всіх Modified entities з Version
    foreach (var entry in ChangeTracker.Entries<Product>()
                                        .Where(e => e.State == EntityState.Modified))
    {
        entry.Entity.Version = Guid.NewGuid();
    }

    return await base.SaveChangesAsync(cancellationToken);
}

Обробка DbUpdateConcurrencyException

try
{
    product.Price = newPrice;
    await context.SaveChangesAsync();
}
catch (DbUpdateConcurrencyException ex)
{
    // ex.Entries: entity що спричинили конфлікт
    var entry = ex.Entries.Single();

    // Отримати значення з трьох джерел:
    var dbValues      = await entry.GetDatabaseValuesAsync(); // поточне у БД
    var currentValues = entry.CurrentValues;                  // що намагались записати
    var originalValues = entry.OriginalValues;               // що читали спочатку

    if (dbValues is null)
    {
        // Рядок видалено іншою транзакцією
        throw new BusinessException("Продукт був видалений іншим користувачем");
    }

    // Стратегія вирішення конфлікту:
    // A) Client Wins: зберегти поточні значення (перетерти!)
    entry.OriginalValues.SetValues(dbValues); // оновити Original
    // Спробувати збереження ще раз (з новою Version)

    // B) Database Wins: відкинути зміни, взяти з БД
    await entry.ReloadAsync();
    // entry.State → Unchanged, значення з БД

    // C) Merge: показати юзеру обидва варіанти
    throw new ConcurrencyConflictException(
        CurrentValues: currentValues.ToObject(),
        DatabaseValues: dbValues.ToObject());
}

Типова стратегія: повідомити користувача

У веб-застосунках найчастіша стратегія — повідомити користувача про конфлікт і попросити повторно відкрити і відредагувати:

// API: включаємо Version у відповідь
[HttpGet("{id}")]
public async Task<IActionResult> GetProduct(int id)
{
    var product = await context.Products.FindAsync(id);
    return Ok(new ProductDto
    {
        Id      = product!.Id,
        Name    = product.Name,
        Price   = product.Price,
        Version = product.Version.ToString() // ← передаємо клієнту
    });
}

// API: перевіряємо Version при PUT
[HttpPut("{id}")]
public async Task<IActionResult> UpdateProduct(int id, UpdateProductDto dto)
{
    var product = await context.Products.FindAsync(id);
    if (product is null) return NotFound();

    // Перевірка: Version з клієнта = Version у БД?
    if (product.Version != Guid.Parse(dto.Version))
        return Conflict("Дані змінились. Будь ласка, оновіть сторінку.");

    product.Name  = dto.Name;
    product.Price = dto.Price;

    try
    {
        await context.SaveChangesAsync();
        return Ok(new { product.Version }); // повертаємо нову Version
    }
    catch (DbUpdateConcurrencyException)
    {
        return Conflict("Одночасний конфлікт. Оновіть і спробуйте знову.");
    }
}

RowVersion: автоматичний конкурентний токен

Замість Guid (що потрібно оновлювати вручну) — RowVersion (SQL Server) або xmin (PostgreSQL) — автоматичний маркер, що оновлюється базою при кожному UPDATE:

public class Order
{
    public int Id { get; set; }
    public string Status { get; set; } = string.Empty;
    public decimal TotalAmount { get; set; }

    // SQL Server: rowversion (timestamp) — автоматично оновлюється БД
    public byte[] RowVersion { get; set; } = null!;
}
// Конфігурація
public class OrderConfiguration : IEntityTypeConfiguration<Order>
{
    public void Configure(EntityTypeBuilder<Order> builder)
    {
        builder.Property(o => o.RowVersion)
               .IsRowVersion()     // ← позначає як RowVersion + ConcurrencyToken
               .IsConcurrencyToken();
    }
}
-- EF Core генерує для SQL Server:
UPDATE Orders
SET Status = @status
WHERE Id = @id AND RowVersion = @originalRowVersion
-- RowVersion оновлюється автоматично сервером при кожному UPDATE

PostgreSQL: xmin системна колонка

PostgreSQL має вбудований механізм через системну колонку xmin — ідентифікатор транзакції що останньою змінила рядок:

// Npgsql: UseXminAsConcurrencyToken
builder.Property<uint>("xmin")
       .HasColumnType("xid")
       .ValueGeneratedOnAddOrUpdate()
       .IsConcurrencyToken();

Або через extension:

// У DbContext OnModelCreating:
modelBuilder.Entity<Order>().UseXminAsConcurrencyToken();

Pessimistic Locking: SELECT FOR UPDATE

Optimistic Concurrency добре для рідких конфліктів. Але у high-contention сценаріях (наприклад, продаж останнього екземпляра товару) — краще заблокувати рядок при читанні:

-- SQL Server: UPDLOCK + ROWLOCK
SELECT * FROM Products WITH (UPDLOCK, ROWLOCK) WHERE Id = @id

-- PostgreSQL: FOR UPDATE
SELECT * FROM products WHERE id = @id FOR UPDATE

EF Core не має нативного API для Pessimistic Locking — потрібен Raw SQL або провайдерний extension:

// SQL Server: через FromSqlRaw з hint
var product = await context.Products
    .FromSqlRaw("SELECT * FROM Products WITH (UPDLOCK, ROWLOCK) WHERE Id = {0}", productId)
    .FirstOrDefaultAsync();

// Тепер рядок заблокований до кінця транзакції
product!.Stock -= quantity;
await context.SaveChangesAsync();
// COMMIT → блокування знімається
// PostgreSQL: FOR UPDATE через Npgsql
var product = await context.Products
    .FromSqlRaw("SELECT * FROM \"Products\" WHERE \"Id\" = {0} FOR UPDATE", productId)
    .FirstOrDefaultAsync();
Pessimistic Locking і дедлоки: Блокування рядків збільшує ризик дедлоків. Завжди блокуйте ресурси в одному і тому ж порядку. Встановлюйте timeout для блокування (LOCK TIMEOUT). Використовуйте лише коли справді необхідно.

Outbox Pattern: надійний запис із зовнішніми сервісами

Проблема: ви зберегли Order у БД і хочете відіслати подію в RabbitMQ/Kafka. Що якщо збережено, але черга недоступна? Або навпаки — черга отримала подію, але SaveChanges впав?

Класичне рішення — Outbox Pattern: зберігайте і дані, і повідомлення в одній транзакції, а окремий фоновий сервіс доставляє повідомлення з гарантованою доставкою (at-least-once).

Структура Outbox

public class OutboxMessage
{
    public Guid    Id           { get; set; } = Guid.NewGuid();
    public string  Type         { get; set; } = string.Empty; // "OrderPlaced", "StockReserved"
    public string  Payload      { get; set; } = string.Empty; // JSON серіалізована подія
    public string  Status       { get; set; } = "Pending";    // Pending, Sent, Failed
    public DateTime CreatedAt   { get; set; } = DateTime.UtcNow;
    public DateTime? SentAt     { get; set; }
    public int     RetryCount   { get; set; } = 0;
    public string? ErrorMessage { get; set; }
}

Запис в Outbox разом з основними даними

public class OrderService
{
    private readonly AppDbContext _context;

    public async Task<Order> PlaceOrderAsync(CreateOrderDto dto)
    {
        await using var transaction = await _context.Database.BeginTransactionAsync();

        try
        {
            // 1. Бізнес-логіка: створити Order
            var order = new Order
            {
                CustomerId  = dto.CustomerId,
                Status      = "Pending",
                TotalAmount = dto.TotalAmount,
                PlacedAt    = DateTime.UtcNow
            };
            _context.Orders.Add(order);

            // 2. Зберегти подію у Outbox — в тій самій транзакції!
            var orderPlacedEvent = new OrderPlacedEvent
            {
                OrderId    = order.Id, // EF Core заповнить Id після SaveChanges
                CustomerId = order.CustomerId,
                Amount     = order.TotalAmount
            };

            _context.OutboxMessages.Add(new OutboxMessage
            {
                Type    = nameof(OrderPlacedEvent),
                Payload = JsonSerializer.Serialize(orderPlacedEvent)
            });

            // 3. Один SaveChanges: і Order, і OutboxMessage — атомарно
            await _context.SaveChangesAsync();

            await transaction.CommitAsync();

            return order;
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

Outbox Processor: фоновий worker

public class OutboxProcessorWorker : BackgroundService
{
    private readonly IServiceScopeFactory  _scopeFactory;
    private readonly IMessageBus           _messageBus;
    private readonly ILogger<OutboxProcessorWorker> _logger;

    public OutboxProcessorWorker(
        IServiceScopeFactory scopeFactory,
        IMessageBus messageBus,
        ILogger<OutboxProcessorWorker> logger)
    {
        _scopeFactory = scopeFactory;
        _messageBus   = messageBus;
        _logger       = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await ProcessPendingMessagesAsync(stoppingToken);
            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
        }
    }

    private async Task ProcessPendingMessagesAsync(CancellationToken ct)
    {
        using var scope   = _scopeFactory.CreateScope();
        var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();

        // Беремо пакет Pending повідомлень
        var messages = await context.OutboxMessages
            .Where(m => m.Status == "Pending" && m.RetryCount < 5)
            .OrderBy(m => m.CreatedAt)
            .Take(50)
            .ToListAsync(ct);

        foreach (var message in messages)
        {
            try
            {
                // Публікуємо у месседж-брокер
                await _messageBus.PublishAsync(message.Type, message.Payload, ct);

                message.Status = "Sent";
                message.SentAt = DateTime.UtcNow;
            }
            catch (Exception ex)
            {
                message.RetryCount++;
                message.ErrorMessage = ex.Message;

                if (message.RetryCount >= 5)
                    message.Status = "Failed"; // Dead letter

                _logger.LogError(ex, "Failed to publish OutboxMessage {Id}", message.Id);
            }
        }

        await context.SaveChangesAsync(ct);
    }
}

Idempotency: повторні доставки

Outbox гарантує at-least-once: повідомлення може бути відкрито більше одного разу (якщо worker впав після публікації але до позначення Sent). Консьюмери мають бути ідемпотентними:

// Консьюмер: ідемпотентна обробка
public class OrderPlacedEventConsumer
{
    public async Task HandleAsync(OrderPlacedEvent evt, AppDbContext context)
    {
        // Перевірка: чи вже оброблено?
        var alreadyProcessed = await context.ProcessedEvents
            .AnyAsync(pe => pe.EventId == evt.EventId);

        if (alreadyProcessed)
        {
            _logger.LogInformation("Event {EventId} already processed, skipping", evt.EventId);
            return;
        }

        // Обробка бізнес-логіки
        await DoBusinessLogicAsync(evt);

        // Зберегти що обробили
        context.ProcessedEvents.Add(new ProcessedEvent { EventId = evt.EventId });
        await context.SaveChangesAsync();
    }
}

Unit of Work: EF Core DbContext як реалізація

Unit of Work — патерн що групує всі зміни однієї «бізнес-операції» і застосовує їх разом. DbContext в EF Core вже є реалізацією Unit of Work. Але у великих проєктах з Repository Pattern — може знадобитись обгортка:

public interface IUnitOfWork : IDisposable
{
    IProductRepository    Products    { get; }
    IOrderRepository      Orders      { get; }
    ICustomerRepository   Customers   { get; }

    Task<int>  SaveChangesAsync(CancellationToken ct = default);
    Task       BeginTransactionAsync();
    Task       CommitAsync();
    Task       RollbackAsync();
}

public class UnitOfWork : IUnitOfWork
{
    private readonly AppDbContext _context;
    private IDbContextTransaction? _transaction;

    public UnitOfWork(AppDbContext context,
                      IProductRepository products,
                      IOrderRepository orders,
                      ICustomerRepository customers)
    {
        _context  = context;
        Products  = products;
        Orders    = orders;
        Customers = customers;
    }

    public IProductRepository  Products  { get; }
    public IOrderRepository    Orders    { get; }
    public ICustomerRepository Customers { get; }

    public Task<int> SaveChangesAsync(CancellationToken ct = default)
        => _context.SaveChangesAsync(ct);

    public async Task BeginTransactionAsync()
        => _transaction = await _context.Database.BeginTransactionAsync();

    public async Task CommitAsync()
    {
        if (_transaction is not null)
        {
            await _transaction.CommitAsync();
            await _transaction.DisposeAsync();
            _transaction = null;
        }
    }

    public async Task RollbackAsync()
    {
        if (_transaction is not null)
        {
            await _transaction.RollbackAsync();
            await _transaction.DisposeAsync();
            _transaction = null;
        }
    }

    public void Dispose()
    {
        _transaction?.Dispose();
        _context.Dispose();
    }
}

Практичні завдання (Частина 2)

Рівень 1 — Базовий

Завдання 1.1: RowVersion для замовлень

Додайте RowVersion до Order (SQL Server) або UseXminAsConcurrencyToken (PostgreSQL). Напишіть тест:

  1. Два DbContext завантажують Order
  2. Context A змінює і зберігає (успішно)
  3. Context B намагається зберегти зі старим RowVersion → DbUpdateConcurrencyException

Завдання 1.2: Обробка ConcurrencyException у API

Для PUT /api/products/{id} з Version полем у DTO:

  1. Якщо Version із DTO ≠ поточна у БД → 409 Conflict з тілом { conflict: true, currentVersion: "..." }
  2. При DbUpdateConcurrencyException → 409 Conflict з порадою оновити
  3. Тест: симулюйте одночасне редагування двома клієнтами

Завдання 1.3: Базовий Outbox

Реалізуйте OutboxMessage таблицю і OrderService.PlaceOrderAsync:

  1. Order + OutboxMessage в одній транзакції
  2. OutboxProcessorWorker що кожні 5 секунд читає Pending і позначає Sent
  3. Перевірте: якщо OrderService кинув виняток після SaveChanges —OutboxMessage вже є у БД. Worker зробить доставку.

Рівень 2 — Логіка

Завдання 2.1: Optimistic Concurrency з автоматичним retry

Реалізуйте ConcurrencyRetryDecorator<T> що обгортає будь-який Update метод і автоматично повторює при DbUpdateConcurrencyException:

  • При конфлікті: перезавантажити entity, повторно застосувати зміни, SaveChanges знову
  • До 3 спроб, потім кидаємо
  • Логуємо кожен retry з деталями конфлікту

Завдання 2.2: Pessimistic Locking для інвентаризації

ReserveStockAsync(int productId, int quantity):

  1. BEGIN TRANSACTION
  2. SELECT ... WITH (UPDLOCK) або FOR UPDATE — блокуємо рядок
  3. Перевіряємо Stock >= quantity
  4. Зменшуємо Stock
  5. COMMIT

Напишіть паралельний тест: 10 Thread одночасно намагаються зарезервувати по 1 штуці з 5 доступних. Рівно 5 мають успішно завершитись (решта — InsufficientStockException).

Рівень 3 — Архітектура

Завдання 3.1: Transactional Outbox з idempotency

Повна реалізація:

  1. OutboxMessage з CorrelationId (UUID що передається з клієнта)
  2. Перед вставкою — перевірка context.OutboxMessages.AnyAsync(m => m.CorrelationId == id) — idempotency ключ
  3. OutboxProcessorWorker з Pessimistic Lock на Outbox рядок під час обробки (SELECT FOR UPDATE SKIP LOCKED для PostgreSQL) — щоб кілька worker-ів не обробляли одне повідомлення
  4. ProcessedEvent таблиця у консьюмері
  5. Інтеграційний тест: worker запускається двічі одночасно, подія публікується рівно один раз

Підсумок статті 21

Ця стаття повністю розкрила збереження даних та concurrency в EF Core:

Частина 1:

  • SaveChanges = автоматична транзакція. Детальний flow: DetectChanges → сортування → батчування → Commit/Rollback.
  • Явна транзакція: BeginTransactionAsync охоплює кілька SaveChanges і Raw SQL.
  • IsolationLevel: чотири рівні, таблиця аномалій.
  • Savepoints: часткове скасування, RollbackToSavepointAsync.
  • DbUpdateException: обробка по SQL Error Number.
  • EnableRetryOnFailure + strategy.ExecuteAsync.

Частина 2:

  • Optimistic Concurrency: ConcurrencyToken (Guid), RowVersion (byte), xmin (PostgreSQL). UPDATE з WHERE Version перевіркою. Три стратегії при конфлікті: Client Wins, Database Wins, Merge.
  • Pessimistic Locking: SELECT FOR UPDATE / WITH (UPDLOCK) через Raw SQL.
  • Outbox Pattern: збереження і повідомлення в одній транзакції → at-least-once delivery. Idempotency у консьюмері через ProcessedEvent таблицю. SELECT FOR UPDATE SKIP LOCKED для паралельних worker-ів.
  • Unit of Work: DbContext вже є UoW. Власна обгортка для інкапсуляції транзакцій і репозиторіїв.

Наступна стаття — Concurrency Conflicts та лocking стратегії (стаття 22) — поглибить теми конкурентності: MVCC, Read Phenomena, Lock Escalation та практичні архітектурні сценарії.


Додаткові ресурси