Ef Core

Interceptors в EF Core — Connection, Transaction та Materialization (Частина 2)

IDbConnectionInterceptor для управління з'єднаннями і connection pooling. IDbTransactionInterceptor для спостереження за транзакціями. IMaterializationInterceptor для кастомної матеріалізації. Suppress Result — замінити виконання своїм результатом. Composite Interceptor Pattern.

Interceptors в EF Core: Connection, Transaction та Materialization

Це продовження статті «Interceptors в EF Core». Читайте послідовно.


IDbConnectionInterceptor: управління з'єднаннями

IDbConnectionInterceptor перехоплює події на рівні DbConnection: відкриття, закриття, поведінку connection pool. Це найнижчий рівень interceptor в EF Core — корисний для діагностики проблем з connection pool, реалізації connection tagging і custom open/close логіки.

Коли потрібен Connection Interceptor

У типовому OLTP застосунку з'єднання відкриваються й закриваються тисячі разів на секунду. Connection Pool управляє ними автоматично, але іноді виникають ситуації де потрібен більший контроль:

  • Діагностика витоків з'єднань: підозра що з'єднання не повертаються у pool
  • SET команди після відкриття: встановлення SET APPLICATION_NAME, SET ROLE, pg_advisory_lock
  • Custom connection string per tenant: для multi-tenant з routing по БД
  • Timing відкриття: виявлення коли pool вичерпаний і запити чекають з'єднання
public class ConnectionLifetimeInterceptor : DbConnectionInterceptor
{
    private readonly ILogger<ConnectionLifetimeInterceptor> _logger;
    private readonly ConcurrentDictionary<int, Stopwatch> _connectionTimers = new();

    public ConnectionLifetimeInterceptor(ILogger<ConnectionLifetimeInterceptor> logger)
    {
        _logger = logger;
    }

    // Перед відкриттям з'єднання
    public override InterceptionResult ConnectionOpening(
        DbConnection connection,
        ConnectionEventData eventData,
        InterceptionResult result)
    {
        _connectionTimers[connection.GetHashCode()] = Stopwatch.StartNew();
        return base.ConnectionOpening(connection, eventData, result);
    }

    // Після відкриття з'єднання
    public override void ConnectionOpened(
        DbConnection connection,
        ConnectionEndEventData eventData)
    {
        if (_connectionTimers.TryGetValue(connection.GetHashCode(), out var sw))
        {
            var openTime = sw.ElapsedMilliseconds;

            if (openTime > 100) // > 100ms — підозріло довго, pool міг бути вичерпаний
            {
                _logger.LogWarning(
                    "Connection took {Duration}ms to open. Connection pool may be exhausted.",
                    openTime);
            }
        }

        base.ConnectionOpened(connection, eventData);
    }

    // При закритті з'єднання
    public override void ConnectionClosed(
        DbConnection connection,
        ConnectionEndEventData eventData)
    {
        if (_connectionTimers.TryRemove(connection.GetHashCode(), out var sw))
        {
            var lifetime = sw.ElapsedMilliseconds;

            _logger.LogDebug(
                "Connection closed after {Lifetime}ms. Active connections: {Count}",
                lifetime,
                _connectionTimers.Count);
        }

        base.ConnectionClosed(connection, eventData);
    }

    // При помилці з'єднання
    public override void ConnectionFailed(
        DbConnection connection,
        ConnectionErrorEventData eventData)
    {
        _logger.LogError(
            eventData.Exception,
            "Connection failed. Exception type: {Type}. Message: {Message}",
            eventData.Exception?.GetType().Name,
            eventData.Exception?.Message);

        base.ConnectionFailed(connection, eventData);
    }
}

SET команди після відкриття з'єднання

Практичний приклад: виконати SQL команди після кожного відкриття з'єднання — SET APPLICATION_NAME для SQL Server або SET ROLE для PostgreSQL Row-Level Security:

public class ConnectionSetupInterceptor : DbConnectionInterceptor
{
    private readonly ICurrentUserService _currentUser;
    private readonly ITenantService _tenantService;

    public ConnectionSetupInterceptor(
        ICurrentUserService currentUser,
        ITenantService tenantService)
    {
        _currentUser   = currentUser;
        _tenantService = tenantService;
    }

    public override async Task ConnectionOpenedAsync(
        DbConnection connection,
        ConnectionEndEventData eventData,
        CancellationToken cancellationToken = default)
    {
        // Встановити application name (видно у sys.dm_exec_sessions SQL Server)
        await using var cmd = connection.CreateCommand();
        cmd.CommandText = $"SET APPLICATION_NAME = 'API.{_currentUser.GetUserId()}'";
        await cmd.ExecuteNonQueryAsync(cancellationToken);

        // PostgreSQL: Row-Level Security через SET
        // await using var roleCmd = connection.CreateCommand();
        // roleCmd.CommandText = $"SET app.current_tenant = '{_tenantService.GetCurrentTenantId()}'";
        // await roleCmd.ExecuteNonQueryAsync(cancellationToken);

        await base.ConnectionOpenedAsync(connection, eventData, cancellationToken);
    }
}

IDbTransactionInterceptor: спостереження за транзакціями

IDbTransactionInterceptor перехоплює life cycle транзакцій: Begin, Commit, Rollback. Корисний для:

  • Діагностики довгих транзакцій
  • Запобігання вкладених транзакцій у конкретних сценаріях
  • Логування всіх транзакційних операцій для audit
public class TransactionDiagnosticsInterceptor : DbTransactionInterceptor
{
    private readonly ILogger<TransactionDiagnosticsInterceptor> _logger;
    private readonly ConcurrentDictionary<Guid, Stopwatch> _txTimers = new();

    public TransactionDiagnosticsInterceptor(
        ILogger<TransactionDiagnosticsInterceptor> logger)
    {
        _logger = logger;
    }

    public override DbTransaction TransactionStarted(
        DbConnection connection,
        TransactionEndEventData eventData,
        DbTransaction result)
    {
        var txId = eventData.TransactionId;
        _txTimers[txId] = Stopwatch.StartNew();

        _logger.LogDebug(
            "Transaction {TxId} started with IsolationLevel={Level}",
            txId,
            result.IsolationLevel);

        return base.TransactionStarted(connection, eventData, result);
    }

    public override void TransactionCommitted(
        DbConnection connection,
        TransactionEndEventData eventData)
    {
        LogTransactionEnd(eventData.TransactionId, "Committed");
        base.TransactionCommitted(connection, eventData);
    }

    public override void TransactionRolledBack(
        DbConnection connection,
        TransactionEndEventData eventData)
    {
        LogTransactionEnd(eventData.TransactionId, "Rolled Back");
        base.TransactionRolledBack(connection, eventData);
    }

    public override Task TransactionFailedAsync(
        DbConnection connection,
        TransactionErrorEventData eventData,
        CancellationToken cancellationToken = default)
    {
        _logger.LogError(
            eventData.Exception,
            "Transaction {TxId} failed: {Message}",
            eventData.TransactionId,
            eventData.Exception.Message);

        return base.TransactionFailedAsync(connection, eventData, cancellationToken);
    }

    private void LogTransactionEnd(Guid txId, string action)
    {
        if (_txTimers.TryRemove(txId, out var sw))
        {
            var duration = sw.ElapsedMilliseconds;

            if (duration > 5000) // > 5 секунд — довга транзакція
                _logger.LogWarning(
                    "Long transaction {TxId} {Action} after {Duration}ms",
                    txId, action, duration);
            else
                _logger.LogDebug(
                    "Transaction {TxId} {Action} ({Duration}ms)",
                    txId, action, duration);
        }
    }
}

IMaterializationInterceptor: кастомна матеріалізація

IMaterializationInterceptor — найрідше використовуваний interceptor. Він дозволяє перехоплювати момент створення C#-об'єкта з DbDataReader рядка. Корисний для:

  • Ін'єкції залежностей у entity (хоча використовуйте обережно)
  • Кастомного маппінгу значень що приходять з БД
  • Обходу стандартного конструктора при матеріалізації
public class MaterializationInterceptor : IMaterializationInterceptor
{
    // Перед матеріалізацією: можна замінити результат
    public object InitializingInstance(
        MaterializationInterceptionData materializationData,
        object instance)
    {
        // instance вже створений (через Activator або конструктор)
        // Можна ін'єктувати залежності або змінити об'єкт

        if (instance is IHasTimezone tzEntity)
        {
            // Встановити TimeZone з HttpContext (наприклад)
            tzEntity.UserTimeZone = TimeZoneInfo.Local;
        }

        return instance;
    }

    // Після матеріалізації: instance заповнений даними з бази
    public object InitializedInstance(
        MaterializationInterceptionData materializationData,
        object instance)
    {
        // Корисно для: аудит що entity було прочитано, кеш, metrics
        if (instance is Product product)
        {
            // Не чіпаємо значення — лише спостерігаємо
        }

        return instance;
    }
}

Suppress Result: замінити виконання своїм результатом

Найпотужніша і найрідше потрібна можливість interceptors — Suppress (придушення): замість реального виконання SQL, повертаємо свій результат. Корисно для:

  • Мок-ування БД у unit тестах без SQLite
  • Query caching: перевіряємо кеш перед зверненням до БД
  • Circuit Breaker: якщо БД недоступна — повертаємо cached або fallback результат

Query Cache Interceptor

public class QueryCacheInterceptor : DbCommandInterceptor
{
    private readonly IMemoryCache _cache;
    private readonly ILogger<QueryCacheInterceptor> _logger;

    // Тільки ці SQL-шаблони кешуються (визначаємо явно щоб уникнути стале чтення)
    private readonly HashSet<string> _cacheablePrefixes = new()
    {
        "SELECT",
    };

    public QueryCacheInterceptor(IMemoryCache cache, ILogger<QueryCacheInterceptor> logger)
    {
        _cache  = cache;
        _logger = logger;
    }

    public override InterceptionResult<DbDataReader> ReaderExecuting(
        DbCommand command,
        CommandEventData eventData,
        InterceptionResult<DbDataReader> result)
    {
        // Спрощена демонстрація — в реальності кешування результатів DbDataReader складне
        // Краще кешувати на рівні сервісу, а не тут
        return base.ReaderExecuting(command, eventData, result);
    }
}

Suppress для mock у тестах

// Interceptor що замінює КОНКРЕТНИЙ запит з mock-даними (для unit tests)
public class MockQueryInterceptor : DbCommandInterceptor
{
    private readonly Dictionary<string, object> _mocks = new();

    public MockQueryInterceptor WhenSql(string sqlContains, object returnValue)
    {
        _mocks[sqlContains] = returnValue;
        return this;
    }

    public override InterceptionResult<object> ScalarExecuting(
        DbCommand command,
        CommandEventData eventData,
        InterceptionResult<object> result)
    {
        foreach (var (sql, value) in _mocks)
        {
            if (command.CommandText.Contains(sql, StringComparison.OrdinalIgnoreCase))
            {
                _logger.LogDebug("Mocking scalar query: {Sql}", sql);
                // Suppress: повертаємо свій результат, реальний SQL не виконується
                return InterceptionResult<object>.SuppressWithResult(value);
            }
        }

        return base.ScalarExecuting(command, eventData, result);
    }
}

// Використання у тестах:
var mock = new MockQueryInterceptor()
    .WhenSql("SELECT COUNT(*) FROM Products", 42);

var options = new DbContextOptionsBuilder<AppDbContext>()
    .UseSqlite("DataSource=:memory:")
    .AddInterceptors(mock)
    .Options;

using var context = new AppDbContext(options);
var count = await context.Products.CountAsync();
Assert.Equal(42, count); // SQL не виконувався — повернули mock!

Composite Interceptor Pattern

При великій кількості interceptors з різними обов'язками — можна зібрати їх у Composite:

// Composite: один interceptor що делегує N іншим
public class CompositeCommandInterceptor : DbCommandInterceptor
{
    private readonly IReadOnlyList<DbCommandInterceptor> _interceptors;

    public CompositeCommandInterceptor(IEnumerable<DbCommandInterceptor> interceptors)
    {
        _interceptors = interceptors.ToList().AsReadOnly();
    }

    public override async ValueTask<InterceptionResult<DbDataReader>> ReaderExecutingAsync(
        DbCommand command,
        CommandEventData eventData,
        InterceptionResult<DbDataReader> result,
        CancellationToken cancellationToken = default)
    {
        foreach (var interceptor in _interceptors)
        {
            result = await interceptor.ReaderExecutingAsync(
                command, eventData, result, cancellationToken);

            if (result.HasResult)
                return result; // Один з interceptors Suppress-ував — зупиняємось
        }

        return result;
    }

    public override async ValueTask<DbDataReader> ReaderExecutedAsync(
        DbCommand command,
        CommandExecutedEventData eventData,
        DbDataReader result,
        CancellationToken cancellationToken = default)
    {
        // Зворотній порядок для "після" методів
        foreach (var interceptor in _interceptors.Reverse())
        {
            result = await interceptor.ReaderExecutedAsync(
                command, eventData, result, cancellationToken);
        }

        return result;
    }
}

Повна конфігурація Interceptor Pipeline

// Program.cs — повна реєстрація всього pipeline
builder.Services.AddHttpContextAccessor();

// Infrastructure services
builder.Services.AddScoped<ICurrentUserService, HttpContextCurrentUserService>();
builder.Services.AddScoped<ITenantService, JwtTenantService>();

// Scoped interceptors (залежать від HTTP-контексту)
builder.Services.AddScoped<AuditInterceptor>();
builder.Services.AddScoped<SoftDeleteInterceptor>();
builder.Services.AddScoped<ConnectionSetupInterceptor>();

// Singleton interceptors (без HTTP-залежностей)
builder.Services.AddSingleton(provider => new SlowQueryInterceptor(
    provider.GetRequiredService<ILogger<SlowQueryInterceptor>>(),
    TimeSpan.FromMilliseconds(
        builder.Configuration.GetValue<int>("EfCore:SlowQueryThresholdMs", 500))));

builder.Services.AddSingleton<TransactionDiagnosticsInterceptor>();

// DbContext: збираємо весь pipeline
builder.Services.AddDbContext<AppDbContext>((provider, options) =>
{
    options
        .UseSqlServer(builder.Configuration.GetConnectionString("Default"))
        .AddInterceptors(
            // Порядок важливий: аудит першим (щоб бачити зміни до будь-яких модифікацій)
            provider.GetRequiredService<AuditInterceptor>(),
            provider.GetRequiredService<SoftDeleteInterceptor>(),

            // Connection-level
            provider.GetRequiredService<ConnectionSetupInterceptor>(),

            // Diagnostics (singleton-ові — без Scoped залежностей)
            provider.GetRequiredService<SlowQueryInterceptor>(),
            provider.GetRequiredService<TransactionDiagnosticsInterceptor>()
        );
});

Практичні завдання (Частина 2)

Рівень 1 — Базовий

Завдання 1.1: Connection Lifetime Monitor

Реалізуйте ConnectionLifetimeInterceptor що:

  1. Вимірює час відкриття кожного з'єднання
  2. При > 50ms — warning (pool може бути вичерпаний)
  3. Рахує кількість активних з'єднань через ConcurrentDictionary
  4. Логує summary кожну хвилину: avg/max lifetime, peak count
  5. Поверніть метрику через кастомний Health Check

Завдання 1.2: Transaction Long-Running Alert

TransactionDiagnosticsInterceptor:

  1. При транзакції > 3 секунди — LogWarning з TransactionId
  2. При > 30 секунд — LogError + надіслати alert через IAlertService
  3. При Rollback — завжди логувати незалежно від тривалості

Завдання 1.3: Suppress для тестування без БД

Реалізуйте InMemoryQueryInterceptor для unit тестів:

  • MockQuery<T>(Func<IEnumerable<T>> data) — повертає mock дані для будь-якого SELECT що повертає T
  • Не потребує жодного кешованого SQL — перехоплює через Suppress
  • Тест: ProductRepository.GetByIdAsync(1) без SQLite чи SQL Server — повертає mock Product

Рівень 2 — Логіка

Завдання 2.1: PostgreSQL Row-Level Security через Connection Interceptor

Реалізуйте multi-tenant RLS через ConnectionSetupInterceptor:

  1. Після відкриття PostgreSQL з'єднання: SET app.current_tenant_id = '{tenantId}'
  2. PostgreSQL Policy: CREATE POLICY tenant_isolation ON products USING (tenant_id = current_setting('app.current_tenant_id')::int)
  3. Тест: два DbContext з різними TenantId — кожен бачить лише свої дані (через RLS, а не Global Query Filter!)

Завдання 2.2: Circuit Breaker Interceptor

Реалізуйте CircuitBreakerInterceptor : DbCommandInterceptor:

  • State: Closed (нормально), Open (блокуємо), HalfOpen (пробний запит)
  • При 5 помилках підряд → Open (всі запити → CircuitOpenException без звернення до БД)
  • Після 30sHalfOpen (пробний запит)
  • Успіх у HalfOpenClosed
  • Suppress у Open: InterceptionResult<DbDataReader>.SuppressWithResult(EmptyReader())

Рівень 3 — Архітектура

Завдання 3.1: Observability Stack

Зберіть повний Observability stack для EF Core через interceptors:

  1. MetricsInterceptor: Prometheus/OpenTelemetry метрики:
    • efcore_queries_total (counter, labels: db_operation, table)
    • efcore_query_duration_ms (histogram, labels: db_operation)
    • efcore_slow_queries_total (counter)
    • efcore_savechanges_total (counter, labels: result)
    • efcore_connections_active (gauge)
  2. TracingInterceptor: OpenTelemetry trace spans:
    • Кожен SQL-запит → Span з attributes: db.statement, db.system, db.operation, db.duration
    • Parent Span → HTTP request span (distributed tracing)
  3. HealthCheckInterceptor: інтеграція з ASP.NET Core Health Checks:
    • Відповідає на /health з результатами останніх 10 queries (avg duration, error rate)

Підсумок статті 23

Ця стаття повністю розкрила Interceptors в EF Core:

Частина 1:

  • Архітектура pipeline: before → operation → after, зворотній порядок для after.
  • DbCommandInterceptor: Reader/NonQuery/Scalar. SlowQueryInterceptor, SQL Rewriting, QueryCounterInterceptor для N+1 тестів.
  • SaveChangesInterceptor: SavingChanges (до), SavedChanges (після з Id), SaveChangesFailed. AuditInterceptor замість override у DbContext. DetailedAuditInterceptor з FillGeneratedIds.
  • Реєстрація: Scoped через DI factory, Singleton без HTTP-залежностей.

Частина 2:

  • IDbConnectionInterceptor: ConnectionOpened/Closed/Failed. SET команди після відкриття (APPLICATION_NAME, RLS, tenant).
  • IDbTransactionInterceptor: моніторинг тривалості транзакцій, довгі транзакції → Warning/Error.
  • IMaterializationInterceptor: ін'єкція даних у матеріалізований entity.
  • Suppress Result: InterceptionResult<T>.SuppressWithResult(value) — замінити виконання SQL своїм результатом. Для Query Caching і Mock у тестах.
  • Composite Interceptor: делегування N interceptors з правильним порядком і Suppress-propagation.
  • Повна конфігурація pipeline: порядок, Scoped vs Singleton, всі рівні.

Наступна стаття — Performance Optimization (стаття 24) — комплексна оптимізація продуктивності EF Core: Connection Pooling, Query Plan Analysis, Database Indexes від EF Core, Benchmarking та профілювання.


Додаткові ресурси