Interceptors в EF Core — Connection, Transaction та Materialization (Частина 2)
Interceptors в EF Core: Connection, Transaction та Materialization
Це продовження статті «Interceptors в EF Core». Читайте послідовно.
IDbConnectionInterceptor: управління з'єднаннями
IDbConnectionInterceptor перехоплює події на рівні DbConnection: відкриття, закриття, поведінку connection pool. Це найнижчий рівень interceptor в EF Core — корисний для діагностики проблем з connection pool, реалізації connection tagging і custom open/close логіки.
Коли потрібен Connection Interceptor
У типовому OLTP застосунку з'єднання відкриваються й закриваються тисячі разів на секунду. Connection Pool управляє ними автоматично, але іноді виникають ситуації де потрібен більший контроль:
- Діагностика витоків з'єднань: підозра що з'єднання не повертаються у pool
- SET команди після відкриття: встановлення
SET APPLICATION_NAME,SET ROLE,pg_advisory_lock - Custom connection string per tenant: для multi-tenant з routing по БД
- Timing відкриття: виявлення коли pool вичерпаний і запити чекають з'єднання
public class ConnectionLifetimeInterceptor : DbConnectionInterceptor
{
private readonly ILogger<ConnectionLifetimeInterceptor> _logger;
private readonly ConcurrentDictionary<int, Stopwatch> _connectionTimers = new();
public ConnectionLifetimeInterceptor(ILogger<ConnectionLifetimeInterceptor> logger)
{
_logger = logger;
}
// Перед відкриттям з'єднання
public override InterceptionResult ConnectionOpening(
DbConnection connection,
ConnectionEventData eventData,
InterceptionResult result)
{
_connectionTimers[connection.GetHashCode()] = Stopwatch.StartNew();
return base.ConnectionOpening(connection, eventData, result);
}
// Після відкриття з'єднання
public override void ConnectionOpened(
DbConnection connection,
ConnectionEndEventData eventData)
{
if (_connectionTimers.TryGetValue(connection.GetHashCode(), out var sw))
{
var openTime = sw.ElapsedMilliseconds;
if (openTime > 100) // > 100ms — підозріло довго, pool міг бути вичерпаний
{
_logger.LogWarning(
"Connection took {Duration}ms to open. Connection pool may be exhausted.",
openTime);
}
}
base.ConnectionOpened(connection, eventData);
}
// При закритті з'єднання
public override void ConnectionClosed(
DbConnection connection,
ConnectionEndEventData eventData)
{
if (_connectionTimers.TryRemove(connection.GetHashCode(), out var sw))
{
var lifetime = sw.ElapsedMilliseconds;
_logger.LogDebug(
"Connection closed after {Lifetime}ms. Active connections: {Count}",
lifetime,
_connectionTimers.Count);
}
base.ConnectionClosed(connection, eventData);
}
// При помилці з'єднання
public override void ConnectionFailed(
DbConnection connection,
ConnectionErrorEventData eventData)
{
_logger.LogError(
eventData.Exception,
"Connection failed. Exception type: {Type}. Message: {Message}",
eventData.Exception?.GetType().Name,
eventData.Exception?.Message);
base.ConnectionFailed(connection, eventData);
}
}
SET команди після відкриття з'єднання
Практичний приклад: виконати SQL команди після кожного відкриття з'єднання — SET APPLICATION_NAME для SQL Server або SET ROLE для PostgreSQL Row-Level Security:
public class ConnectionSetupInterceptor : DbConnectionInterceptor
{
private readonly ICurrentUserService _currentUser;
private readonly ITenantService _tenantService;
public ConnectionSetupInterceptor(
ICurrentUserService currentUser,
ITenantService tenantService)
{
_currentUser = currentUser;
_tenantService = tenantService;
}
public override async Task ConnectionOpenedAsync(
DbConnection connection,
ConnectionEndEventData eventData,
CancellationToken cancellationToken = default)
{
// Встановити application name (видно у sys.dm_exec_sessions SQL Server)
await using var cmd = connection.CreateCommand();
cmd.CommandText = $"SET APPLICATION_NAME = 'API.{_currentUser.GetUserId()}'";
await cmd.ExecuteNonQueryAsync(cancellationToken);
// PostgreSQL: Row-Level Security через SET
// await using var roleCmd = connection.CreateCommand();
// roleCmd.CommandText = $"SET app.current_tenant = '{_tenantService.GetCurrentTenantId()}'";
// await roleCmd.ExecuteNonQueryAsync(cancellationToken);
await base.ConnectionOpenedAsync(connection, eventData, cancellationToken);
}
}
IDbTransactionInterceptor: спостереження за транзакціями
IDbTransactionInterceptor перехоплює life cycle транзакцій: Begin, Commit, Rollback. Корисний для:
- Діагностики довгих транзакцій
- Запобігання вкладених транзакцій у конкретних сценаріях
- Логування всіх транзакційних операцій для audit
public class TransactionDiagnosticsInterceptor : DbTransactionInterceptor
{
private readonly ILogger<TransactionDiagnosticsInterceptor> _logger;
private readonly ConcurrentDictionary<Guid, Stopwatch> _txTimers = new();
public TransactionDiagnosticsInterceptor(
ILogger<TransactionDiagnosticsInterceptor> logger)
{
_logger = logger;
}
public override DbTransaction TransactionStarted(
DbConnection connection,
TransactionEndEventData eventData,
DbTransaction result)
{
var txId = eventData.TransactionId;
_txTimers[txId] = Stopwatch.StartNew();
_logger.LogDebug(
"Transaction {TxId} started with IsolationLevel={Level}",
txId,
result.IsolationLevel);
return base.TransactionStarted(connection, eventData, result);
}
public override void TransactionCommitted(
DbConnection connection,
TransactionEndEventData eventData)
{
LogTransactionEnd(eventData.TransactionId, "Committed");
base.TransactionCommitted(connection, eventData);
}
public override void TransactionRolledBack(
DbConnection connection,
TransactionEndEventData eventData)
{
LogTransactionEnd(eventData.TransactionId, "Rolled Back");
base.TransactionRolledBack(connection, eventData);
}
public override Task TransactionFailedAsync(
DbConnection connection,
TransactionErrorEventData eventData,
CancellationToken cancellationToken = default)
{
_logger.LogError(
eventData.Exception,
"Transaction {TxId} failed: {Message}",
eventData.TransactionId,
eventData.Exception.Message);
return base.TransactionFailedAsync(connection, eventData, cancellationToken);
}
private void LogTransactionEnd(Guid txId, string action)
{
if (_txTimers.TryRemove(txId, out var sw))
{
var duration = sw.ElapsedMilliseconds;
if (duration > 5000) // > 5 секунд — довга транзакція
_logger.LogWarning(
"Long transaction {TxId} {Action} after {Duration}ms",
txId, action, duration);
else
_logger.LogDebug(
"Transaction {TxId} {Action} ({Duration}ms)",
txId, action, duration);
}
}
}
IMaterializationInterceptor: кастомна матеріалізація
IMaterializationInterceptor — найрідше використовуваний interceptor. Він дозволяє перехоплювати момент створення C#-об'єкта з DbDataReader рядка. Корисний для:
- Ін'єкції залежностей у entity (хоча використовуйте обережно)
- Кастомного маппінгу значень що приходять з БД
- Обходу стандартного конструктора при матеріалізації
public class MaterializationInterceptor : IMaterializationInterceptor
{
// Перед матеріалізацією: можна замінити результат
public object InitializingInstance(
MaterializationInterceptionData materializationData,
object instance)
{
// instance вже створений (через Activator або конструктор)
// Можна ін'єктувати залежності або змінити об'єкт
if (instance is IHasTimezone tzEntity)
{
// Встановити TimeZone з HttpContext (наприклад)
tzEntity.UserTimeZone = TimeZoneInfo.Local;
}
return instance;
}
// Після матеріалізації: instance заповнений даними з бази
public object InitializedInstance(
MaterializationInterceptionData materializationData,
object instance)
{
// Корисно для: аудит що entity було прочитано, кеш, metrics
if (instance is Product product)
{
// Не чіпаємо значення — лише спостерігаємо
}
return instance;
}
}
Suppress Result: замінити виконання своїм результатом
Найпотужніша і найрідше потрібна можливість interceptors — Suppress (придушення): замість реального виконання SQL, повертаємо свій результат. Корисно для:
- Мок-ування БД у unit тестах без SQLite
- Query caching: перевіряємо кеш перед зверненням до БД
- Circuit Breaker: якщо БД недоступна — повертаємо cached або fallback результат
Query Cache Interceptor
public class QueryCacheInterceptor : DbCommandInterceptor
{
private readonly IMemoryCache _cache;
private readonly ILogger<QueryCacheInterceptor> _logger;
// Тільки ці SQL-шаблони кешуються (визначаємо явно щоб уникнути стале чтення)
private readonly HashSet<string> _cacheablePrefixes = new()
{
"SELECT",
};
public QueryCacheInterceptor(IMemoryCache cache, ILogger<QueryCacheInterceptor> logger)
{
_cache = cache;
_logger = logger;
}
public override InterceptionResult<DbDataReader> ReaderExecuting(
DbCommand command,
CommandEventData eventData,
InterceptionResult<DbDataReader> result)
{
// Спрощена демонстрація — в реальності кешування результатів DbDataReader складне
// Краще кешувати на рівні сервісу, а не тут
return base.ReaderExecuting(command, eventData, result);
}
}
Suppress для mock у тестах
// Interceptor що замінює КОНКРЕТНИЙ запит з mock-даними (для unit tests)
public class MockQueryInterceptor : DbCommandInterceptor
{
private readonly Dictionary<string, object> _mocks = new();
public MockQueryInterceptor WhenSql(string sqlContains, object returnValue)
{
_mocks[sqlContains] = returnValue;
return this;
}
public override InterceptionResult<object> ScalarExecuting(
DbCommand command,
CommandEventData eventData,
InterceptionResult<object> result)
{
foreach (var (sql, value) in _mocks)
{
if (command.CommandText.Contains(sql, StringComparison.OrdinalIgnoreCase))
{
_logger.LogDebug("Mocking scalar query: {Sql}", sql);
// Suppress: повертаємо свій результат, реальний SQL не виконується
return InterceptionResult<object>.SuppressWithResult(value);
}
}
return base.ScalarExecuting(command, eventData, result);
}
}
// Використання у тестах:
var mock = new MockQueryInterceptor()
.WhenSql("SELECT COUNT(*) FROM Products", 42);
var options = new DbContextOptionsBuilder<AppDbContext>()
.UseSqlite("DataSource=:memory:")
.AddInterceptors(mock)
.Options;
using var context = new AppDbContext(options);
var count = await context.Products.CountAsync();
Assert.Equal(42, count); // SQL не виконувався — повернули mock!
Composite Interceptor Pattern
При великій кількості interceptors з різними обов'язками — можна зібрати їх у Composite:
// Composite: один interceptor що делегує N іншим
public class CompositeCommandInterceptor : DbCommandInterceptor
{
private readonly IReadOnlyList<DbCommandInterceptor> _interceptors;
public CompositeCommandInterceptor(IEnumerable<DbCommandInterceptor> interceptors)
{
_interceptors = interceptors.ToList().AsReadOnly();
}
public override async ValueTask<InterceptionResult<DbDataReader>> ReaderExecutingAsync(
DbCommand command,
CommandEventData eventData,
InterceptionResult<DbDataReader> result,
CancellationToken cancellationToken = default)
{
foreach (var interceptor in _interceptors)
{
result = await interceptor.ReaderExecutingAsync(
command, eventData, result, cancellationToken);
if (result.HasResult)
return result; // Один з interceptors Suppress-ував — зупиняємось
}
return result;
}
public override async ValueTask<DbDataReader> ReaderExecutedAsync(
DbCommand command,
CommandExecutedEventData eventData,
DbDataReader result,
CancellationToken cancellationToken = default)
{
// Зворотній порядок для "після" методів
foreach (var interceptor in _interceptors.Reverse())
{
result = await interceptor.ReaderExecutedAsync(
command, eventData, result, cancellationToken);
}
return result;
}
}
Повна конфігурація Interceptor Pipeline
// Program.cs — повна реєстрація всього pipeline
builder.Services.AddHttpContextAccessor();
// Infrastructure services
builder.Services.AddScoped<ICurrentUserService, HttpContextCurrentUserService>();
builder.Services.AddScoped<ITenantService, JwtTenantService>();
// Scoped interceptors (залежать від HTTP-контексту)
builder.Services.AddScoped<AuditInterceptor>();
builder.Services.AddScoped<SoftDeleteInterceptor>();
builder.Services.AddScoped<ConnectionSetupInterceptor>();
// Singleton interceptors (без HTTP-залежностей)
builder.Services.AddSingleton(provider => new SlowQueryInterceptor(
provider.GetRequiredService<ILogger<SlowQueryInterceptor>>(),
TimeSpan.FromMilliseconds(
builder.Configuration.GetValue<int>("EfCore:SlowQueryThresholdMs", 500))));
builder.Services.AddSingleton<TransactionDiagnosticsInterceptor>();
// DbContext: збираємо весь pipeline
builder.Services.AddDbContext<AppDbContext>((provider, options) =>
{
options
.UseSqlServer(builder.Configuration.GetConnectionString("Default"))
.AddInterceptors(
// Порядок важливий: аудит першим (щоб бачити зміни до будь-яких модифікацій)
provider.GetRequiredService<AuditInterceptor>(),
provider.GetRequiredService<SoftDeleteInterceptor>(),
// Connection-level
provider.GetRequiredService<ConnectionSetupInterceptor>(),
// Diagnostics (singleton-ові — без Scoped залежностей)
provider.GetRequiredService<SlowQueryInterceptor>(),
provider.GetRequiredService<TransactionDiagnosticsInterceptor>()
);
});
Практичні завдання (Частина 2)
Рівень 1 — Базовий
Завдання 1.1: Connection Lifetime Monitor
Реалізуйте ConnectionLifetimeInterceptor що:
- Вимірює час відкриття кожного з'єднання
- При
> 50ms— warning (pool може бути вичерпаний) - Рахує кількість активних з'єднань через
ConcurrentDictionary - Логує summary кожну хвилину: avg/max lifetime, peak count
- Поверніть метрику через кастомний Health Check
Завдання 1.2: Transaction Long-Running Alert
TransactionDiagnosticsInterceptor:
- При транзакції > 3 секунди —
LogWarningзTransactionId - При > 30 секунд —
LogError+ надіслати alert черезIAlertService - При
Rollback— завжди логувати незалежно від тривалості
Завдання 1.3: Suppress для тестування без БД
Реалізуйте InMemoryQueryInterceptor для unit тестів:
MockQuery<T>(Func<IEnumerable<T>> data)— повертає mock дані для будь-якого SELECT що повертає T- Не потребує жодного кешованого SQL — перехоплює через
Suppress - Тест:
ProductRepository.GetByIdAsync(1)без SQLite чи SQL Server — повертає mock Product
Рівень 2 — Логіка
Завдання 2.1: PostgreSQL Row-Level Security через Connection Interceptor
Реалізуйте multi-tenant RLS через ConnectionSetupInterceptor:
- Після відкриття PostgreSQL з'єднання:
SET app.current_tenant_id = '{tenantId}' - PostgreSQL Policy:
CREATE POLICY tenant_isolation ON products USING (tenant_id = current_setting('app.current_tenant_id')::int) - Тест: два DbContext з різними TenantId — кожен бачить лише свої дані (через RLS, а не Global Query Filter!)
Завдання 2.2: Circuit Breaker Interceptor
Реалізуйте CircuitBreakerInterceptor : DbCommandInterceptor:
- State:
Closed(нормально),Open(блокуємо),HalfOpen(пробний запит) - При 5 помилках підряд →
Open(всі запити →CircuitOpenExceptionбез звернення до БД) - Після
30s→HalfOpen(пробний запит) - Успіх у
HalfOpen→Closed - Suppress у
Open:InterceptionResult<DbDataReader>.SuppressWithResult(EmptyReader())
Рівень 3 — Архітектура
Завдання 3.1: Observability Stack
Зберіть повний Observability stack для EF Core через interceptors:
MetricsInterceptor: Prometheus/OpenTelemetry метрики:efcore_queries_total(counter, labels: db_operation, table)efcore_query_duration_ms(histogram, labels: db_operation)efcore_slow_queries_total(counter)efcore_savechanges_total(counter, labels: result)efcore_connections_active(gauge)
TracingInterceptor: OpenTelemetry trace spans:- Кожен SQL-запит → Span з attributes:
db.statement,db.system,db.operation,db.duration - Parent Span → HTTP request span (distributed tracing)
- Кожен SQL-запит → Span з attributes:
HealthCheckInterceptor: інтеграція з ASP.NET Core Health Checks:- Відповідає на
/healthз результатами останніх 10 queries (avg duration, error rate)
- Відповідає на
Підсумок статті 23
Ця стаття повністю розкрила Interceptors в EF Core:
Частина 1:
- Архітектура pipeline: before → operation → after, зворотній порядок для after.
DbCommandInterceptor: Reader/NonQuery/Scalar. SlowQueryInterceptor, SQL Rewriting, QueryCounterInterceptor для N+1 тестів.SaveChangesInterceptor:SavingChanges(до),SavedChanges(після з Id),SaveChangesFailed. AuditInterceptor замість override у DbContext. DetailedAuditInterceptor зFillGeneratedIds.- Реєстрація: Scoped через DI factory, Singleton без HTTP-залежностей.
Частина 2:
IDbConnectionInterceptor: ConnectionOpened/Closed/Failed. SET команди після відкриття (APPLICATION_NAME, RLS, tenant).IDbTransactionInterceptor: моніторинг тривалості транзакцій, довгі транзакції → Warning/Error.IMaterializationInterceptor: ін'єкція даних у матеріалізований entity.- Suppress Result:
InterceptionResult<T>.SuppressWithResult(value)— замінити виконання SQL своїм результатом. Для Query Caching і Mock у тестах. - Composite Interceptor: делегування N interceptors з правильним порядком і Suppress-propagation.
- Повна конфігурація pipeline: порядок, Scoped vs Singleton, всі рівні.
Наступна стаття — Performance Optimization (стаття 24) — комплексна оптимізація продуктивності EF Core: Connection Pooling, Query Plan Analysis, Database Indexes від EF Core, Benchmarking та профілювання.
Додаткові ресурси
Interceptors в EF Core (Частина 1)
EF Core Interceptors — механізм перехоплення операцій на найнижчому рівні ORM. DbCommandInterceptor для модифікації SQL, SaveChangesInterceptor для аудиту та domain events, IDbConnectionInterceptor для управління з'єднаннями. Архітектура pipeline interceptors.
План вивчення Entity Framework Core — Повний курс