Збереження Даних — Concurrency та Outbox (Частина 2)
Збереження Даних: Concurrency та Outbox Pattern
Це продовження статті «Збереження Даних та Транзакції». Читайте послідовно.
Проблема конкурентного запису
Уявіть: два менеджери одночасно відкривають картку одного і того ж товару і зменшують ціну. Менеджер A бачить ціну 10000 і встановлює 9000. Менеджер B теж бачить 10000 і встановлює 8500. Хто переможе? Той, чий запит виконається останнім — і зміна першого буде мовчки перезаписана.
Це класична Lost Update проблема: паралельні записи без координації призводять до втрати змін.
В базах даних є два підходи для вирішення:
Optimistic Concurrency — «оптимістичний»: ми вважаємо що конфлікти рідкісні. Читаємо дані без блокування, а при записі перевіряємо, чи хтось вже не змінив їх відтоді. Якщо так — видаємо помилку. Підхід для читання-інтенсивних додатків де конфлікти дійсно рідкісні.
Pessimistic Locking — «песимістичний»: ми вважаємо що конфлікти часті. Блокуємо рядок при читанні — ніхто інший не може його редагувати поки ми не завершимо. Підхід для систем з частими конкурентними записами до тих самих рядків.
Optimistic Concurrency: ConcurrencyToken
[ConcurrencyToken] або IsConcurrencyToken() позначає властивість як маркер конкурентності. EF Core включає її у WHERE умову UPDATE, щоб перевірити що рядок не змінився з моменту читання:
public class Product
{
public int Id { get; set; }
public string Name { get; set; } = string.Empty;
public decimal Price { get; set; }
public int Stock { get; set; }
// ConcurrencyToken: будь-яка властивість що унікально ідентифікує версію
[ConcurrencyToken]
public Guid Version { get; set; } = Guid.NewGuid();
}
// Конфігурація через Fluent API:
builder.Property(p => p.Version).IsConcurrencyToken();
Коли EF Core генерує UPDATE для Product — він включає перевірку Version:
-- EF Core генерує:
UPDATE Products
SET Name = @name, Price = @price, Version = @newVersion
WHERE Id = @id AND Version = @originalVersion -- ← перевірка!
-- Якщо між читанням і записом хтось змінив рядок (Version вже інша):
-- 0 рядків оновлено → DbUpdateConcurrencyException
Оновлення Version при кожній зміні
При оновленні — потрібно генерувати нову Version. Це можна зробити через SaveChangesAsync override:
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
// Оновити Version для всіх Modified entities з Version
foreach (var entry in ChangeTracker.Entries<Product>()
.Where(e => e.State == EntityState.Modified))
{
entry.Entity.Version = Guid.NewGuid();
}
return await base.SaveChangesAsync(cancellationToken);
}
Обробка DbUpdateConcurrencyException
try
{
product.Price = newPrice;
await context.SaveChangesAsync();
}
catch (DbUpdateConcurrencyException ex)
{
// ex.Entries: entity що спричинили конфлікт
var entry = ex.Entries.Single();
// Отримати значення з трьох джерел:
var dbValues = await entry.GetDatabaseValuesAsync(); // поточне у БД
var currentValues = entry.CurrentValues; // що намагались записати
var originalValues = entry.OriginalValues; // що читали спочатку
if (dbValues is null)
{
// Рядок видалено іншою транзакцією
throw new BusinessException("Продукт був видалений іншим користувачем");
}
// Стратегія вирішення конфлікту:
// A) Client Wins: зберегти поточні значення (перетерти!)
entry.OriginalValues.SetValues(dbValues); // оновити Original
// Спробувати збереження ще раз (з новою Version)
// B) Database Wins: відкинути зміни, взяти з БД
await entry.ReloadAsync();
// entry.State → Unchanged, значення з БД
// C) Merge: показати юзеру обидва варіанти
throw new ConcurrencyConflictException(
CurrentValues: currentValues.ToObject(),
DatabaseValues: dbValues.ToObject());
}
Типова стратегія: повідомити користувача
У веб-застосунках найчастіша стратегія — повідомити користувача про конфлікт і попросити повторно відкрити і відредагувати:
// API: включаємо Version у відповідь
[HttpGet("{id}")]
public async Task<IActionResult> GetProduct(int id)
{
var product = await context.Products.FindAsync(id);
return Ok(new ProductDto
{
Id = product!.Id,
Name = product.Name,
Price = product.Price,
Version = product.Version.ToString() // ← передаємо клієнту
});
}
// API: перевіряємо Version при PUT
[HttpPut("{id}")]
public async Task<IActionResult> UpdateProduct(int id, UpdateProductDto dto)
{
var product = await context.Products.FindAsync(id);
if (product is null) return NotFound();
// Перевірка: Version з клієнта = Version у БД?
if (product.Version != Guid.Parse(dto.Version))
return Conflict("Дані змінились. Будь ласка, оновіть сторінку.");
product.Name = dto.Name;
product.Price = dto.Price;
try
{
await context.SaveChangesAsync();
return Ok(new { product.Version }); // повертаємо нову Version
}
catch (DbUpdateConcurrencyException)
{
return Conflict("Одночасний конфлікт. Оновіть і спробуйте знову.");
}
}
RowVersion: автоматичний конкурентний токен
Замість Guid (що потрібно оновлювати вручну) — RowVersion (SQL Server) або xmin (PostgreSQL) — автоматичний маркер, що оновлюється базою при кожному UPDATE:
public class Order
{
public int Id { get; set; }
public string Status { get; set; } = string.Empty;
public decimal TotalAmount { get; set; }
// SQL Server: rowversion (timestamp) — автоматично оновлюється БД
public byte[] RowVersion { get; set; } = null!;
}
// Конфігурація
public class OrderConfiguration : IEntityTypeConfiguration<Order>
{
public void Configure(EntityTypeBuilder<Order> builder)
{
builder.Property(o => o.RowVersion)
.IsRowVersion() // ← позначає як RowVersion + ConcurrencyToken
.IsConcurrencyToken();
}
}
-- EF Core генерує для SQL Server:
UPDATE Orders
SET Status = @status
WHERE Id = @id AND RowVersion = @originalRowVersion
-- RowVersion оновлюється автоматично сервером при кожному UPDATE
PostgreSQL: xmin системна колонка
PostgreSQL має вбудований механізм через системну колонку xmin — ідентифікатор транзакції що останньою змінила рядок:
// Npgsql: UseXminAsConcurrencyToken
builder.Property<uint>("xmin")
.HasColumnType("xid")
.ValueGeneratedOnAddOrUpdate()
.IsConcurrencyToken();
Або через extension:
// У DbContext OnModelCreating:
modelBuilder.Entity<Order>().UseXminAsConcurrencyToken();
Pessimistic Locking: SELECT FOR UPDATE
Optimistic Concurrency добре для рідких конфліктів. Але у high-contention сценаріях (наприклад, продаж останнього екземпляра товару) — краще заблокувати рядок при читанні:
-- SQL Server: UPDLOCK + ROWLOCK
SELECT * FROM Products WITH (UPDLOCK, ROWLOCK) WHERE Id = @id
-- PostgreSQL: FOR UPDATE
SELECT * FROM products WHERE id = @id FOR UPDATE
EF Core не має нативного API для Pessimistic Locking — потрібен Raw SQL або провайдерний extension:
// SQL Server: через FromSqlRaw з hint
var product = await context.Products
.FromSqlRaw("SELECT * FROM Products WITH (UPDLOCK, ROWLOCK) WHERE Id = {0}", productId)
.FirstOrDefaultAsync();
// Тепер рядок заблокований до кінця транзакції
product!.Stock -= quantity;
await context.SaveChangesAsync();
// COMMIT → блокування знімається
// PostgreSQL: FOR UPDATE через Npgsql
var product = await context.Products
.FromSqlRaw("SELECT * FROM \"Products\" WHERE \"Id\" = {0} FOR UPDATE", productId)
.FirstOrDefaultAsync();
LOCK TIMEOUT). Використовуйте лише коли справді необхідно.Outbox Pattern: надійний запис із зовнішніми сервісами
Проблема: ви зберегли Order у БД і хочете відіслати подію в RabbitMQ/Kafka. Що якщо збережено, але черга недоступна? Або навпаки — черга отримала подію, але SaveChanges впав?
Класичне рішення — Outbox Pattern: зберігайте і дані, і повідомлення в одній транзакції, а окремий фоновий сервіс доставляє повідомлення з гарантованою доставкою (at-least-once).
Структура Outbox
public class OutboxMessage
{
public Guid Id { get; set; } = Guid.NewGuid();
public string Type { get; set; } = string.Empty; // "OrderPlaced", "StockReserved"
public string Payload { get; set; } = string.Empty; // JSON серіалізована подія
public string Status { get; set; } = "Pending"; // Pending, Sent, Failed
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? SentAt { get; set; }
public int RetryCount { get; set; } = 0;
public string? ErrorMessage { get; set; }
}
Запис в Outbox разом з основними даними
public class OrderService
{
private readonly AppDbContext _context;
public async Task<Order> PlaceOrderAsync(CreateOrderDto dto)
{
await using var transaction = await _context.Database.BeginTransactionAsync();
try
{
// 1. Бізнес-логіка: створити Order
var order = new Order
{
CustomerId = dto.CustomerId,
Status = "Pending",
TotalAmount = dto.TotalAmount,
PlacedAt = DateTime.UtcNow
};
_context.Orders.Add(order);
// 2. Зберегти подію у Outbox — в тій самій транзакції!
var orderPlacedEvent = new OrderPlacedEvent
{
OrderId = order.Id, // EF Core заповнить Id після SaveChanges
CustomerId = order.CustomerId,
Amount = order.TotalAmount
};
_context.OutboxMessages.Add(new OutboxMessage
{
Type = nameof(OrderPlacedEvent),
Payload = JsonSerializer.Serialize(orderPlacedEvent)
});
// 3. Один SaveChanges: і Order, і OutboxMessage — атомарно
await _context.SaveChangesAsync();
await transaction.CommitAsync();
return order;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
Outbox Processor: фоновий worker
public class OutboxProcessorWorker : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IMessageBus _messageBus;
private readonly ILogger<OutboxProcessorWorker> _logger;
public OutboxProcessorWorker(
IServiceScopeFactory scopeFactory,
IMessageBus messageBus,
ILogger<OutboxProcessorWorker> logger)
{
_scopeFactory = scopeFactory;
_messageBus = messageBus;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await ProcessPendingMessagesAsync(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
private async Task ProcessPendingMessagesAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
// Беремо пакет Pending повідомлень
var messages = await context.OutboxMessages
.Where(m => m.Status == "Pending" && m.RetryCount < 5)
.OrderBy(m => m.CreatedAt)
.Take(50)
.ToListAsync(ct);
foreach (var message in messages)
{
try
{
// Публікуємо у месседж-брокер
await _messageBus.PublishAsync(message.Type, message.Payload, ct);
message.Status = "Sent";
message.SentAt = DateTime.UtcNow;
}
catch (Exception ex)
{
message.RetryCount++;
message.ErrorMessage = ex.Message;
if (message.RetryCount >= 5)
message.Status = "Failed"; // Dead letter
_logger.LogError(ex, "Failed to publish OutboxMessage {Id}", message.Id);
}
}
await context.SaveChangesAsync(ct);
}
}
Idempotency: повторні доставки
Outbox гарантує at-least-once: повідомлення може бути відкрито більше одного разу (якщо worker впав після публікації але до позначення Sent). Консьюмери мають бути ідемпотентними:
// Консьюмер: ідемпотентна обробка
public class OrderPlacedEventConsumer
{
public async Task HandleAsync(OrderPlacedEvent evt, AppDbContext context)
{
// Перевірка: чи вже оброблено?
var alreadyProcessed = await context.ProcessedEvents
.AnyAsync(pe => pe.EventId == evt.EventId);
if (alreadyProcessed)
{
_logger.LogInformation("Event {EventId} already processed, skipping", evt.EventId);
return;
}
// Обробка бізнес-логіки
await DoBusinessLogicAsync(evt);
// Зберегти що обробили
context.ProcessedEvents.Add(new ProcessedEvent { EventId = evt.EventId });
await context.SaveChangesAsync();
}
}
Unit of Work: EF Core DbContext як реалізація
Unit of Work — патерн що групує всі зміни однієї «бізнес-операції» і застосовує їх разом. DbContext в EF Core вже є реалізацією Unit of Work. Але у великих проєктах з Repository Pattern — може знадобитись обгортка:
public interface IUnitOfWork : IDisposable
{
IProductRepository Products { get; }
IOrderRepository Orders { get; }
ICustomerRepository Customers { get; }
Task<int> SaveChangesAsync(CancellationToken ct = default);
Task BeginTransactionAsync();
Task CommitAsync();
Task RollbackAsync();
}
public class UnitOfWork : IUnitOfWork
{
private readonly AppDbContext _context;
private IDbContextTransaction? _transaction;
public UnitOfWork(AppDbContext context,
IProductRepository products,
IOrderRepository orders,
ICustomerRepository customers)
{
_context = context;
Products = products;
Orders = orders;
Customers = customers;
}
public IProductRepository Products { get; }
public IOrderRepository Orders { get; }
public ICustomerRepository Customers { get; }
public Task<int> SaveChangesAsync(CancellationToken ct = default)
=> _context.SaveChangesAsync(ct);
public async Task BeginTransactionAsync()
=> _transaction = await _context.Database.BeginTransactionAsync();
public async Task CommitAsync()
{
if (_transaction is not null)
{
await _transaction.CommitAsync();
await _transaction.DisposeAsync();
_transaction = null;
}
}
public async Task RollbackAsync()
{
if (_transaction is not null)
{
await _transaction.RollbackAsync();
await _transaction.DisposeAsync();
_transaction = null;
}
}
public void Dispose()
{
_transaction?.Dispose();
_context.Dispose();
}
}
Практичні завдання (Частина 2)
Рівень 1 — Базовий
Завдання 1.1: RowVersion для замовлень
Додайте RowVersion до Order (SQL Server) або UseXminAsConcurrencyToken (PostgreSQL). Напишіть тест:
- Два DbContext завантажують Order
- Context A змінює і зберігає (успішно)
- Context B намагається зберегти зі старим RowVersion →
DbUpdateConcurrencyException
Завдання 1.2: Обробка ConcurrencyException у API
Для PUT /api/products/{id} з Version полем у DTO:
- Якщо
Versionіз DTO ≠ поточна у БД → 409 Conflict з тілом{ conflict: true, currentVersion: "..." } - При
DbUpdateConcurrencyException→ 409 Conflict з порадою оновити - Тест: симулюйте одночасне редагування двома клієнтами
Завдання 1.3: Базовий Outbox
Реалізуйте OutboxMessage таблицю і OrderService.PlaceOrderAsync:
- Order + OutboxMessage в одній транзакції
OutboxProcessorWorkerщо кожні 5 секунд читає Pending і позначає Sent- Перевірте: якщо OrderService кинув виняток після
SaveChanges—OutboxMessage вже є у БД. Worker зробить доставку.
Рівень 2 — Логіка
Завдання 2.1: Optimistic Concurrency з автоматичним retry
Реалізуйте ConcurrencyRetryDecorator<T> що обгортає будь-який Update метод і автоматично повторює при DbUpdateConcurrencyException:
- При конфлікті: перезавантажити entity, повторно застосувати зміни, SaveChanges знову
- До 3 спроб, потім кидаємо
- Логуємо кожен retry з деталями конфлікту
Завдання 2.2: Pessimistic Locking для інвентаризації
ReserveStockAsync(int productId, int quantity):
BEGIN TRANSACTIONSELECT ... WITH (UPDLOCK)абоFOR UPDATE— блокуємо рядок- Перевіряємо
Stock >= quantity - Зменшуємо Stock
COMMIT
Напишіть паралельний тест: 10 Thread одночасно намагаються зарезервувати по 1 штуці з 5 доступних. Рівно 5 мають успішно завершитись (решта — InsufficientStockException).
Рівень 3 — Архітектура
Завдання 3.1: Transactional Outbox з idempotency
Повна реалізація:
OutboxMessageзCorrelationId(UUID що передається з клієнта)- Перед вставкою — перевірка
context.OutboxMessages.AnyAsync(m => m.CorrelationId == id)— idempotency ключ OutboxProcessorWorkerз Pessimistic Lock на Outbox рядок під час обробки (SELECT FOR UPDATE SKIP LOCKEDдля PostgreSQL) — щоб кілька worker-ів не обробляли одне повідомленняProcessedEventтаблиця у консьюмері- Інтеграційний тест: worker запускається двічі одночасно, подія публікується рівно один раз
Підсумок статті 21
Ця стаття повністю розкрила збереження даних та concurrency в EF Core:
Частина 1:
- SaveChanges = автоматична транзакція. Детальний flow: DetectChanges → сортування → батчування → Commit/Rollback.
- Явна транзакція:
BeginTransactionAsyncохоплює кілька SaveChanges і Raw SQL. IsolationLevel: чотири рівні, таблиця аномалій.Savepoints: часткове скасування,RollbackToSavepointAsync.DbUpdateException: обробка по SQL Error Number.EnableRetryOnFailure+strategy.ExecuteAsync.
Частина 2:
- Optimistic Concurrency:
ConcurrencyToken(Guid),RowVersion(byte),xmin(PostgreSQL). UPDATE з WHERE Version перевіркою. Три стратегії при конфлікті: Client Wins, Database Wins, Merge. - Pessimistic Locking:
SELECT FOR UPDATE/WITH (UPDLOCK)через Raw SQL. - Outbox Pattern: збереження і повідомлення в одній транзакції → at-least-once delivery. Idempotency у консьюмері через
ProcessedEventтаблицю.SELECT FOR UPDATE SKIP LOCKEDдля паралельних worker-ів. - Unit of Work: DbContext вже є UoW. Власна обгортка для інкапсуляції транзакцій і репозиторіїв.
Наступна стаття — Concurrency Conflicts та лocking стратегії (стаття 22) — поглибить теми конкурентності: MVCC, Read Phenomena, Lock Escalation та практичні архітектурні сценарії.
Додаткові ресурси
Збереження Даних та Транзакції (Частина 1)
SaveChanges і SaveChangesAsync в EF Core — атомарність за замовчуванням, що відбувається під капотом. Явні транзакції через BeginTransactionAsync, IsolationLevel, Savepoints. Обробка DbUpdateException та транзакційна відмова.
Конкурентність та Блокування (Частина 1)
Глибокий розбір конкурентності в EF Core — Read Phenomena (dirty read, non-repeatable read, phantom read), MVCC у PostgreSQL vs Lock-based у SQL Server, практичні сценарії race conditions та їх вирішення через правильні стратегії блокування.