Це продовження статті «Збереження Даних та Транзакції». Читайте послідовно.
Уявіть: два менеджери одночасно відкривають картку одного і того ж товару і зменшують ціну. Менеджер A бачить ціну 10000 і встановлює 9000. Менеджер B теж бачить 10000 і встановлює 8500. Хто переможе? Той, чий запит виконається останнім — і зміна першого буде мовчки перезаписана.
Це класична Lost Update проблема: паралельні записи без координації призводять до втрати змін.
В базах даних є два підходи для вирішення:
Optimistic Concurrency — «оптимістичний»: ми вважаємо що конфлікти рідкісні. Читаємо дані без блокування, а при записі перевіряємо, чи хтось вже не змінив їх відтоді. Якщо так — видаємо помилку. Підхід для читання-інтенсивних додатків де конфлікти дійсно рідкісні.
Pessimistic Locking — «песимістичний»: ми вважаємо що конфлікти часті. Блокуємо рядок при читанні — ніхто інший не може його редагувати поки ми не завершимо. Підхід для систем з частими конкурентними записами до тих самих рядків.
[ConcurrencyToken] або IsConcurrencyToken() позначає властивість як маркер конкурентності. EF Core включає її у WHERE умову UPDATE, щоб перевірити що рядок не змінився з моменту читання:
public class Product
{
public int Id { get; set; }
public string Name { get; set; } = string.Empty;
public decimal Price { get; set; }
public int Stock { get; set; }
// ConcurrencyToken: будь-яка властивість що унікально ідентифікує версію
[ConcurrencyToken]
public Guid Version { get; set; } = Guid.NewGuid();
}
// Конфігурація через Fluent API:
builder.Property(p => p.Version).IsConcurrencyToken();
Коли EF Core генерує UPDATE для Product — він включає перевірку Version:
-- EF Core генерує:
UPDATE Products
SET Name = @name, Price = @price, Version = @newVersion
WHERE Id = @id AND Version = @originalVersion -- ← перевірка!
-- Якщо між читанням і записом хтось змінив рядок (Version вже інша):
-- 0 рядків оновлено → DbUpdateConcurrencyException
При оновленні — потрібно генерувати нову Version. Це можна зробити через SaveChangesAsync override:
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
// Оновити Version для всіх Modified entities з Version
foreach (var entry in ChangeTracker.Entries<Product>()
.Where(e => e.State == EntityState.Modified))
{
entry.Entity.Version = Guid.NewGuid();
}
return await base.SaveChangesAsync(cancellationToken);
}
try
{
product.Price = newPrice;
await context.SaveChangesAsync();
}
catch (DbUpdateConcurrencyException ex)
{
// ex.Entries: entity що спричинили конфлікт
var entry = ex.Entries.Single();
// Отримати значення з трьох джерел:
var dbValues = await entry.GetDatabaseValuesAsync(); // поточне у БД
var currentValues = entry.CurrentValues; // що намагались записати
var originalValues = entry.OriginalValues; // що читали спочатку
if (dbValues is null)
{
// Рядок видалено іншою транзакцією
throw new BusinessException("Продукт був видалений іншим користувачем");
}
// Стратегія вирішення конфлікту:
// A) Client Wins: зберегти поточні значення (перетерти!)
entry.OriginalValues.SetValues(dbValues); // оновити Original
// Спробувати збереження ще раз (з новою Version)
// B) Database Wins: відкинути зміни, взяти з БД
await entry.ReloadAsync();
// entry.State → Unchanged, значення з БД
// C) Merge: показати юзеру обидва варіанти
throw new ConcurrencyConflictException(
CurrentValues: currentValues.ToObject(),
DatabaseValues: dbValues.ToObject());
}
У веб-застосунках найчастіша стратегія — повідомити користувача про конфлікт і попросити повторно відкрити і відредагувати:
// API: включаємо Version у відповідь
[HttpGet("{id}")]
public async Task<IActionResult> GetProduct(int id)
{
var product = await context.Products.FindAsync(id);
return Ok(new ProductDto
{
Id = product!.Id,
Name = product.Name,
Price = product.Price,
Version = product.Version.ToString() // ← передаємо клієнту
});
}
// API: перевіряємо Version при PUT
[HttpPut("{id}")]
public async Task<IActionResult> UpdateProduct(int id, UpdateProductDto dto)
{
var product = await context.Products.FindAsync(id);
if (product is null) return NotFound();
// Перевірка: Version з клієнта = Version у БД?
if (product.Version != Guid.Parse(dto.Version))
return Conflict("Дані змінились. Будь ласка, оновіть сторінку.");
product.Name = dto.Name;
product.Price = dto.Price;
try
{
await context.SaveChangesAsync();
return Ok(new { product.Version }); // повертаємо нову Version
}
catch (DbUpdateConcurrencyException)
{
return Conflict("Одночасний конфлікт. Оновіть і спробуйте знову.");
}
}
Замість Guid (що потрібно оновлювати вручну) — RowVersion (SQL Server) або xmin (PostgreSQL) — автоматичний маркер, що оновлюється базою при кожному UPDATE:
public class Order
{
public int Id { get; set; }
public string Status { get; set; } = string.Empty;
public decimal TotalAmount { get; set; }
// SQL Server: rowversion (timestamp) — автоматично оновлюється БД
public byte[] RowVersion { get; set; } = null!;
}
// Конфігурація
public class OrderConfiguration : IEntityTypeConfiguration<Order>
{
public void Configure(EntityTypeBuilder<Order> builder)
{
builder.Property(o => o.RowVersion)
.IsRowVersion() // ← позначає як RowVersion + ConcurrencyToken
.IsConcurrencyToken();
}
}
-- EF Core генерує для SQL Server:
UPDATE Orders
SET Status = @status
WHERE Id = @id AND RowVersion = @originalRowVersion
-- RowVersion оновлюється автоматично сервером при кожному UPDATE
PostgreSQL має вбудований механізм через системну колонку xmin — ідентифікатор транзакції що останньою змінила рядок:
// Npgsql: UseXminAsConcurrencyToken
builder.Property<uint>("xmin")
.HasColumnType("xid")
.ValueGeneratedOnAddOrUpdate()
.IsConcurrencyToken();
Або через extension:
// У DbContext OnModelCreating:
modelBuilder.Entity<Order>().UseXminAsConcurrencyToken();
Optimistic Concurrency добре для рідких конфліктів. Але у high-contention сценаріях (наприклад, продаж останнього екземпляра товару) — краще заблокувати рядок при читанні:
-- SQL Server: UPDLOCK + ROWLOCK
SELECT * FROM Products WITH (UPDLOCK, ROWLOCK) WHERE Id = @id
-- PostgreSQL: FOR UPDATE
SELECT * FROM products WHERE id = @id FOR UPDATE
EF Core не має нативного API для Pessimistic Locking — потрібен Raw SQL або провайдерний extension:
// SQL Server: через FromSqlRaw з hint
var product = await context.Products
.FromSqlRaw("SELECT * FROM Products WITH (UPDLOCK, ROWLOCK) WHERE Id = {0}", productId)
.FirstOrDefaultAsync();
// Тепер рядок заблокований до кінця транзакції
product!.Stock -= quantity;
await context.SaveChangesAsync();
// COMMIT → блокування знімається
// PostgreSQL: FOR UPDATE через Npgsql
var product = await context.Products
.FromSqlRaw("SELECT * FROM \"Products\" WHERE \"Id\" = {0} FOR UPDATE", productId)
.FirstOrDefaultAsync();
LOCK TIMEOUT). Використовуйте лише коли справді необхідно.Проблема: ви зберегли Order у БД і хочете відіслати подію в RabbitMQ/Kafka. Що якщо збережено, але черга недоступна? Або навпаки — черга отримала подію, але SaveChanges впав?
Класичне рішення — Outbox Pattern: зберігайте і дані, і повідомлення в одній транзакції, а окремий фоновий сервіс доставляє повідомлення з гарантованою доставкою (at-least-once).
public class OutboxMessage
{
public Guid Id { get; set; } = Guid.NewGuid();
public string Type { get; set; } = string.Empty; // "OrderPlaced", "StockReserved"
public string Payload { get; set; } = string.Empty; // JSON серіалізована подія
public string Status { get; set; } = "Pending"; // Pending, Sent, Failed
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? SentAt { get; set; }
public int RetryCount { get; set; } = 0;
public string? ErrorMessage { get; set; }
}
public class OrderService
{
private readonly AppDbContext _context;
public async Task<Order> PlaceOrderAsync(CreateOrderDto dto)
{
await using var transaction = await _context.Database.BeginTransactionAsync();
try
{
// 1. Бізнес-логіка: створити Order
var order = new Order
{
CustomerId = dto.CustomerId,
Status = "Pending",
TotalAmount = dto.TotalAmount,
PlacedAt = DateTime.UtcNow
};
_context.Orders.Add(order);
// 2. Зберегти подію у Outbox — в тій самій транзакції!
var orderPlacedEvent = new OrderPlacedEvent
{
OrderId = order.Id, // EF Core заповнить Id після SaveChanges
CustomerId = order.CustomerId,
Amount = order.TotalAmount
};
_context.OutboxMessages.Add(new OutboxMessage
{
Type = nameof(OrderPlacedEvent),
Payload = JsonSerializer.Serialize(orderPlacedEvent)
});
// 3. Один SaveChanges: і Order, і OutboxMessage — атомарно
await _context.SaveChangesAsync();
await transaction.CommitAsync();
return order;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
public class OutboxProcessorWorker : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IMessageBus _messageBus;
private readonly ILogger<OutboxProcessorWorker> _logger;
public OutboxProcessorWorker(
IServiceScopeFactory scopeFactory,
IMessageBus messageBus,
ILogger<OutboxProcessorWorker> logger)
{
_scopeFactory = scopeFactory;
_messageBus = messageBus;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await ProcessPendingMessagesAsync(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
private async Task ProcessPendingMessagesAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
// Беремо пакет Pending повідомлень
var messages = await context.OutboxMessages
.Where(m => m.Status == "Pending" && m.RetryCount < 5)
.OrderBy(m => m.CreatedAt)
.Take(50)
.ToListAsync(ct);
foreach (var message in messages)
{
try
{
// Публікуємо у месседж-брокер
await _messageBus.PublishAsync(message.Type, message.Payload, ct);
message.Status = "Sent";
message.SentAt = DateTime.UtcNow;
}
catch (Exception ex)
{
message.RetryCount++;
message.ErrorMessage = ex.Message;
if (message.RetryCount >= 5)
message.Status = "Failed"; // Dead letter
_logger.LogError(ex, "Failed to publish OutboxMessage {Id}", message.Id);
}
}
await context.SaveChangesAsync(ct);
}
}
Outbox гарантує at-least-once: повідомлення може бути відкрито більше одного разу (якщо worker впав після публікації але до позначення Sent). Консьюмери мають бути ідемпотентними:
// Консьюмер: ідемпотентна обробка
public class OrderPlacedEventConsumer
{
public async Task HandleAsync(OrderPlacedEvent evt, AppDbContext context)
{
// Перевірка: чи вже оброблено?
var alreadyProcessed = await context.ProcessedEvents
.AnyAsync(pe => pe.EventId == evt.EventId);
if (alreadyProcessed)
{
_logger.LogInformation("Event {EventId} already processed, skipping", evt.EventId);
return;
}
// Обробка бізнес-логіки
await DoBusinessLogicAsync(evt);
// Зберегти що обробили
context.ProcessedEvents.Add(new ProcessedEvent { EventId = evt.EventId });
await context.SaveChangesAsync();
}
}
Unit of Work — патерн що групує всі зміни однієї «бізнес-операції» і застосовує їх разом. DbContext в EF Core вже є реалізацією Unit of Work. Але у великих проєктах з Repository Pattern — може знадобитись обгортка:
public interface IUnitOfWork : IDisposable
{
IProductRepository Products { get; }
IOrderRepository Orders { get; }
ICustomerRepository Customers { get; }
Task<int> SaveChangesAsync(CancellationToken ct = default);
Task BeginTransactionAsync();
Task CommitAsync();
Task RollbackAsync();
}
public class UnitOfWork : IUnitOfWork
{
private readonly AppDbContext _context;
private IDbContextTransaction? _transaction;
public UnitOfWork(AppDbContext context,
IProductRepository products,
IOrderRepository orders,
ICustomerRepository customers)
{
_context = context;
Products = products;
Orders = orders;
Customers = customers;
}
public IProductRepository Products { get; }
public IOrderRepository Orders { get; }
public ICustomerRepository Customers { get; }
public Task<int> SaveChangesAsync(CancellationToken ct = default)
=> _context.SaveChangesAsync(ct);
public async Task BeginTransactionAsync()
=> _transaction = await _context.Database.BeginTransactionAsync();
public async Task CommitAsync()
{
if (_transaction is not null)
{
await _transaction.CommitAsync();
await _transaction.DisposeAsync();
_transaction = null;
}
}
public async Task RollbackAsync()
{
if (_transaction is not null)
{
await _transaction.RollbackAsync();
await _transaction.DisposeAsync();
_transaction = null;
}
}
public void Dispose()
{
_transaction?.Dispose();
_context.Dispose();
}
}
Завдання 1.1: RowVersion для замовлень
Додайте RowVersion до Order (SQL Server) або UseXminAsConcurrencyToken (PostgreSQL). Напишіть тест:
DbUpdateConcurrencyExceptionЗавдання 1.2: Обробка ConcurrencyException у API
Для PUT /api/products/{id} з Version полем у DTO:
Version із DTO ≠ поточна у БД → 409 Conflict з тілом { conflict: true, currentVersion: "..." }DbUpdateConcurrencyException → 409 Conflict з порадою оновитиЗавдання 1.3: Базовий Outbox
Реалізуйте OutboxMessage таблицю і OrderService.PlaceOrderAsync:
OutboxProcessorWorker що кожні 5 секунд читає Pending і позначає SentSaveChanges —OutboxMessage вже є у БД. Worker зробить доставку.Завдання 2.1: Optimistic Concurrency з автоматичним retry
Реалізуйте ConcurrencyRetryDecorator<T> що обгортає будь-який Update метод і автоматично повторює при DbUpdateConcurrencyException:
Завдання 2.2: Pessimistic Locking для інвентаризації
ReserveStockAsync(int productId, int quantity):
BEGIN TRANSACTIONSELECT ... WITH (UPDLOCK) або FOR UPDATE — блокуємо рядокStock >= quantityCOMMITНапишіть паралельний тест: 10 Thread одночасно намагаються зарезервувати по 1 штуці з 5 доступних. Рівно 5 мають успішно завершитись (решта — InsufficientStockException).
Завдання 3.1: Transactional Outbox з idempotency
Повна реалізація:
OutboxMessage з CorrelationId (UUID що передається з клієнта)context.OutboxMessages.AnyAsync(m => m.CorrelationId == id) — idempotency ключOutboxProcessorWorker з Pessimistic Lock на Outbox рядок під час обробки (SELECT FOR UPDATE SKIP LOCKED для PostgreSQL) — щоб кілька worker-ів не обробляли одне повідомленняProcessedEvent таблиця у консьюмеріЦя стаття повністю розкрила збереження даних та concurrency в EF Core:
Частина 1:
BeginTransactionAsync охоплює кілька SaveChanges і Raw SQL.IsolationLevel: чотири рівні, таблиця аномалій.Savepoints: часткове скасування, RollbackToSavepointAsync.DbUpdateException: обробка по SQL Error Number.EnableRetryOnFailure + strategy.ExecuteAsync.Частина 2:
ConcurrencyToken (Guid), RowVersion (byte), xmin (PostgreSQL). UPDATE з WHERE Version перевіркою. Три стратегії при конфлікті: Client Wins, Database Wins, Merge.SELECT FOR UPDATE / WITH (UPDLOCK) через Raw SQL.ProcessedEvent таблицю. SELECT FOR UPDATE SKIP LOCKED для паралельних worker-ів.Наступна стаття — Concurrency Conflicts та лocking стратегії (стаття 22) — поглибить теми конкурентності: MVCC, Read Phenomena, Lock Escalation та практичні архітектурні сценарії.
Збереження Даних та Транзакції (Частина 1)
SaveChanges і SaveChangesAsync в EF Core — атомарність за замовчуванням, що відбувається під капотом. Явні транзакції через BeginTransactionAsync, IsolationLevel, Savepoints. Обробка DbUpdateException та транзакційна відмова.
Конкурентність та Блокування (Частина 1)
Глибокий розбір конкурентності в EF Core — Read Phenomena (dirty read, non-repeatable read, phantom read), MVCC у PostgreSQL vs Lock-based у SQL Server, практичні сценарії race conditions та їх вирішення через правильні стратегії блокування.