FastAPI

SQLAlchemy 2.0 — ORM, Core та Async Engine

Глибоке занурення в SQLAlchemy 2.0: DeclarativeBase, Mapped[T], Engine, AsyncSession, relationships, N+1 проблема та патерн Unit of Work під капотом. Порівняння з EF Core з ASP.NET.

Жоден production-рівень API не живе у вакуумі. Будь-який реальний вебсервіс неминуче стикається з необхідністю зберігати, читати, оновлювати та видаляти дані у постійному сховищі — реляційній базі даних. Саме тут на сцену виходить SQLAlchemy — найповніший, найзріліший і найпопулярніший інструментарій для роботи з базами даних у Python, який часто називають «швейцарським ножем» між ORM та SQL-будівником.

Проте SQLAlchemy — це не просто ще один ORM. Це ціла екосистема, що складається з двох чітко розмежованих рівнів: Core (низькорівневий будівник SQL-запитів) і ORM (об'єктно-реляційне відображення). Знання обох рівнів і розуміння того, де закінчується один і починається інший, є ознакою зрілого Python-розробника.

У версії 2.0, яка вийшла у лютому 2023 року після тривалого перехідного періоду, SQLAlchemy отримала принципово новий, сучасний Python-синтаксис із підтримкою повної статичної типізації. Якщо ви бачили старий стиль SQLAlchemy 1.x — із класичними декларативними моделями на базі Column() та рядковими рефернесами — забудьте про нього. SQLAlchemy 2.0 виглядає і відчувається зовсім по-іншому. Власне, саме для цього і написана ця стаття.

Ця стаття є частиною великого циклу про FastAPI та базується на знаннях із попередніх матеріалів. Зокрема, концепція asyncio та асинхронного програмування (стаття 14) буде критично важливою у розділі про AsyncSession. Знання type hints і Pydantic (стаття 15) допоможе розібратись у новому синтаксисі Mapped[T] та mapped_column().

«Початок з далека»: Навіщо взагалі потрібен ORM?

Уявіть, що ви пишете FastAPI-ендпоінт, який має отримати список проектів конкретного користувача з бази даних PostgreSQL. Без жодного ORM або допоміжного інструменту вам доведеться:

  1. Відкрити низькорівневе підключення до бази даних через Python-драйвер (psycopg2 або asyncpg).
  2. Сформувати SQL-рядок вручну — потенційно вразливий до SQL-ін'єкцій.
  3. Виконати запит і отримати сирий результат у вигляді списку кортежів або словників.
  4. Вручну перетворити ці дані у Python-об'єкти (або Pydantic-моделі).
  5. При оновленні — стежити за тим, які поля змінилися, і самостійно генерувати UPDATE-запит.
  6. Явно керувати транзакціями: BEGIN, COMMIT, ROLLBACK.

Це величезний обсяг шаблонного коду (boilerplate), схильного до помилок і складного у тестуванні. ORM (Object-Relational Mapper) вирішує цю проблему, надаючи абстракцію між реляційним світом таблиць і рядків та об'єктно-орієнтованим світом Python-класів і екземплярів.

Без ORM (Raw SQL)

  • Ручне формування SQL-рядків
  • Ризик SQL-ін'єкцій без параметризації
  • Сирі дані у вигляді кортежів (42, 'arakviel', 'active')
  • Ручне перетворення у Python-об'єкти
  • Самостійне управління транзакціями
  • Код прив'язаний до конкретного діалекту SQL

З SQLAlchemy ORM

  • Моделі як звичайні Python-класи (class User(Base): ...)
  • Безпечні параметризовані запити «з коробки»
  • Результати як типізовані об'єкти user.username
  • Автоматичне відстеження змін (Change Tracking)
  • Управління транзакціями через контекстний менеджер
  • Підтримка різних БД (PostgreSQL, MySQL, SQLite) без зміни коду

Архітектура SQLAlchemy: Два рівні абстракції

Однією з найважливіших речей, яку потрібно зрозуміти про SQLAlchemy ще до написання першого рядка коду, є її дворівнева архітектура. Ця концепція корінним чином відрізняє SQLAlchemy від більшості інших ORM (як, наприклад, Django ORM або Peewee), які надають лише один рівень — об'єктний.

Loading diagram...
@startuml
skinparam style plain
skinparam linetype ortho
skinparam defaultFontSize 13
skinparam backgroundColor #ffffff

package "SQLAlchemy Ecosystem" {
    package "ORM Layer (Об'єктний рівень)" #e8f4f8 {
        component [DeclarativeBase\nMapped Classes] as models
        component [Session\nAsyncSession] as session
        component [Unit of Work\nIdentity Map] as uow
        component [Relationships\nLazy/Eager Loading] as rel
    }

    package "Core Layer (SQL рівень)" #fff3cd {
        component [Engine\nAsyncEngine] as engine
        component [Connection Pool] as pool
        component [select() insert()\nupdate() delete()] as dml
        component [Table\nColumn\nMetaData] as meta
    }

    package "DBAPI (Драйвер)" #f8d7da {
        component [psycopg2 / psycopg\n(Sync PostgreSQL)] as sync_drv
        component [asyncpg\n(Async PostgreSQL)] as async_drv
    }

    database "PostgreSQL" as pg
}

models --> session
session --> uow
uow --> dml
session --> dml
dml --> engine
engine --> pool
pool --> sync_drv
pool --> async_drv
sync_drv --> pg
async_drv --> pg
meta --> dml

@enduml

Розглянемо кожен рівень детально.

SQLAlchemy Core — «Конструктор SQL»

Core — це нижній рівень SQLAlchemy. Він надає Python-API для побудови SQL-виразів як об'єктів мови Python, а не як рядків. Цей рівень є «мовонезалежним» у розумінні бази даних: ви будуєте вираз select(users_table).where(users_table.c.id == 42), а SQLAlchemy сама генерує правильний SQL-діалект для PostgreSQL, MySQL, SQLite чи Oracle.

Ключові компоненти Core:

  • Engine — «серце» SQLAlchemy, яке управляє пулом підключень до бази даних.
  • MetaData та Table — опис структури таблиць як Python-об'єктів.
  • DML-функціїselect(), insert(), update(), delete() для побудови запитів.
  • Connection — об'єкт одного активного підключення до БД.

SQLAlchemy ORM — «Об'єктне відображення»

ORM — це верхній рівень, побудований поверх Core. Він надає класичне об'єктно-реляційне відображення: ви працюєте з Python-класами (моделями), а SQLAlchemy автоматично перетворює операції над ними у відповідні SQL-запити.

Ключові компоненти ORM:

  • DeclarativeBase та Mapped — оголошення моделей (таблиць) як Python-класів.
  • Session та AsyncSession — «workspace» для роботи з об'єктами, реалізація патерну Unit of Work.
  • relationship() — декларативний опис зв'язків між таблицями (one-to-many, many-to-many тощо).
  • Identity Map — внутрішній реєстр, що гарантує унікальність об'єктів у межах однієї сесії.
У SQLAlchemy 2.0 ця межа стала ще чіткішою: ORM-запити тепер будуються виключно через Core DML-функції (select(), insert() тощо), а не через застарілий session.query() API. Тобто ORM тепер «говорить» мовою Core, що робить код більш уніфікованим і передбачуваним.

Порівняння: EF Core ↔ SQLAlchemy

Перед тим як занурюватися в код, варто провести паралелі з інструментарієм ASP.NET, щоб сформувати правильні ментальні моделі. Якщо ви вже працювали з Entity Framework Core, більшість концепцій SQLAlchemy одразу стануть інтуїтивно зрозумілими — назви відрізняються, але ідеї залишаються тими самими.

КонцепціяEntity Framework Core (ASP.NET)SQLAlchemy 2.0 (Python)
Реєстрація «бази об'єктів»DbContextSession / AsyncSession
Набір записів таблиціDbSet<TEntity>Mapped[] + Table
Підключення до БДDbContext.DatabaseEngine / AsyncEngine
ORM-модель (сутність)class User : BaseEntityclass User(Base): ...
Відстеження змінDbContext.ChangeTrackerSession (Unit of Work)
Тільки читанняAsNoTracking()execution_options(populate_existing=True) або select() без session.add()
Міграціїdotnet ef migrations addalembic revision --autogenerate
Eager Loading.Include(u => u.Posts)selectinload(User.posts)
Lazy LoadingАвтоматично (за замовчуванням)Вимкнено за замовчуванням (вимагає явного налаштування)
LINQ-запитиcontext.Users.Where(u => u.IsActive)select(User).where(User.is_active == True)

Найважливіша відмінність: в EF Core DbContext є одночасно і «сховищем об'єктів», і точкою доступу до запитів (через DbSet). У SQLAlchemy ці ролі чітко розділені: Engine управляє підключеннями, а Session управляє об'єктами. Це розділення є більш явним і дозволяє краще контролювати життєвий цикл обох сутностей.


Engine та Connection Pool: Фундамент SQLAlchemy

Engine — це перший об'єкт, який потрібно створити для роботи з SQLAlchemy. Він є центральною точкою конфігурації та управляє пулом підключень (connection pool) до бази даних.

Чому потрібен Connection Pool?

Відкриття нового підключення до PostgreSQL — це відносно «дорога» операція: вона включає TCP-рукостискання, автентифікацію та ініціалізацію внутрішнього стану сесії PostgreSQL. У вебзастосунку, де одночасно можуть надходити сотні або тисячі запитів, відкривати і закривати з'єднання для кожного запиту є неприпустимо повільним підходом.

Connection Pool вирішує цю проблему: він заздалегідь відкриває кілька підключень і тримає їх «живими». Коли FastAPI-обробнику потрібне підключення до бази даних, він бере готове з пулу, використовує його, і повертає назад — підключення не закривається, а очікує наступного запиту.

Loading diagram...
@startuml
skinparam style plain
skinparam linetype ortho
skinparam defaultFontSize 13
skinparam ArrowColor #555555
skinparam backgroundColor #ffffff

package "FastAPI Application" #e8f4f8 {
    component [Request 1] as R1
    component [Request 2] as R2
    component [Request 3] as R3
}

package "Connection Pool (QueuePool)" #fff3cd {
    component [Connection 1\n(зайнята)] as C1 #ffcccc
    component [Connection 2\n(вільна)]  as C2 #ccffcc
    component [Connection 3\n(вільна)]  as C3 #ccffcc
    component [Connection 4\n(overflow)] as C4 #ffe0b2
}

database "PostgreSQL Server" as PG #f8d7da

R1 --> C1
R2 --> C2
R3 --> C3

C1 --> PG
C2 --> PG
C3 --> PG
C4 .right.> PG : створюється при\nперевантаженні

@enduml

Python DB-API 2.0: Що таке «драйвер» і як він працює?

Перш ніж розбирати create_engine(), важливо зрозуміти рівень, що знаходиться нижче SQLAlchemy, — безпосередній Python-драйвер для PostgreSQL.

DB-API 2.0 (PEP 249) — це стандартний Python-інтерфейс для роботи з реляційними базами даних. Він визначає мінімальний набір функцій і об'єктів, які має реалізувати будь-який Python-пакет для роботи з конкретною СУБД. Завдяки цьому стандарту SQLAlchemy може прозоро перемикатися між різними драйверами без зміни вашого коду.

Аналогія: DB-API 2.0 — це «стандартна розетка», а конкретні драйвери (psycopg2, asyncpg, psycopg) — це «прилади», що підключаються до неї. SQLAlchemy — це «подовжувач», який надає вам зручний API поверх будь-якого «приладу».

Як виглядає робота з драйвером напряму?

Щоб по-справжньому оцінити, від чого абстрагує SQLAlchemy, подивимося на «голий» DB-API 2.0 з psycopg2:

raw_dbapi_demo.py
import psycopg2
from psycopg2.extras import RealDictCursor

# 1. Відкриваємо підключення вручну
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    dbname="taskforge_db",
    user="user",
    password="password",
)

try:
    # 2. Створюємо курсор (об'єкт для виконання запитів)
    # RealDictCursor → результати у вигляді dict замість tuple
    with conn.cursor(cursor_factory=RealDictCursor) as cur:

        # 3. Виконуємо параметризований запит (захист від SQL-ін'єкцій)
        cur.execute(
            "SELECT id, username, email FROM users WHERE is_active = %s LIMIT %s",
            (True, 10),  # Параметри — окремим кортежем, НЕ через f-string!
        )

        # 4. Отримуємо результати
        rows = cur.fetchall()
        for row in rows:
            print(row["id"], row["username"])  # row — це dict

        # 5. INSERT з поверненням згенерованого id
        cur.execute(
            "INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
            ("new_user", "new@example.com"),
        )
        new_id = cur.fetchone()["id"]

    # 6. Підтверджуємо транзакцію вручну
    conn.commit()

except Exception:
    conn.rollback()  # Відкат при будь-якій помилці
    raise
finally:
    conn.close()  # Завжди закриваємо підключення

Цей код працює, але очевидно, наскільки він багатослівний: ручне управління з'єднаннями, курсорами, транзакціями та перетворенням результатів — все це ваша відповідальність. SQLAlchemy автоматизує весь цей boilerplate.

Асинхронний аналог із asyncpg виглядає інакше (asyncpg не реалізує DB-API 2.0, а має власний API, оптимізований для asyncio):

raw_asyncpg_demo.py
import asyncpg

async def main():
    # asyncpg підключається напряму без курсорів
    conn = await asyncpg.connect(
        host="localhost", port=5432,
        database="taskforge_db", user="user", password="password",
    )
    try:
        # asyncpg повертає список Record-об'єктів (схожі на dict)
        rows = await conn.fetch(
            "SELECT id, username FROM users WHERE is_active = $1 LIMIT $2",
            True, 10,  # Параметри через $1, $2 (PostgreSQL-стиль)
        )
        for row in rows:
            print(row["id"], row["username"])

        new_id = await conn.fetchval(
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
            "new_user", "new@example.com",
        )
    finally:
        await conn.close()
Зверніть на різницю у плейсхолдерах: psycopg2 використовує %s (стиль Python printf), тоді як asyncpg використовує $1, $2, $3 (нативний PostgreSQL-стиль). SQLAlchemy абстрагує цю різницю — ви завжди пишете однаковий Python-код, незалежно від драйвера.

Які драйвери обирати у 2026+ році?

На сьогодні для PostgreSQL існує три основних варіанти драйверів. Ось актуальна картина:

ДрайверТипСтатусКоли використовувати
psycopg2СинхроннийСтабільний, але legacyСтарі проєкти, де вже використовується. Нові проєкти — краще уникати.
psycopg (v3)Sync + Async✅ АктуальнийРекомендований вибір для синхронних проєктів. Прямий наступник psycopg2.
asyncpgАсинхронний✅ АктуальнийРекомендований вибір для async FastAPI. Найшвидший async-драйвер.

asyncpg

Рекомендований для async FastAPI. Написаний на Cython, використовує бінарний PostgreSQL-протокол. За бенчмарками в 3–5 разів швидший за psycopg2. Не реалізує DB-API 2.0, зате надає зручний нативний async API. SQLAlchemy підтримує через postgresql+asyncpg://.

pip install asyncpg

psycopg (v3)

Рекомендований для синхронних проєктів або міграції з psycopg2. Підтримує як sync, так і async режими в одному пакеті. Реалізує оновлений DB-API 2.0. Значно продуктивніший за psycopg2. SQLAlchemy підтримує через postgresql+psycopg:// (без цифри 2).

pip install "psycopg[binary]"
Для нового FastAPI-проєкту з async SQLAlchemy — стандартна комбінація:
pip install sqlalchemy asyncpg
DSN: postgresql+asyncpg://user:password@localhost/dbname

Які бази даних підтримує SQLAlchemy «з коробки»?

SQLAlchemy не прив'язана до PostgreSQL. Офіційна документація визначає п'ять вбудованих (included) діалектів — тобто БД, для яких підтримка вбудована безпосередньо в пакет sqlalchemy без встановлення сторонніх розширень.

Діалект (dialect) у термінах SQLAlchemy — це модуль, що «перекладає» узагальнений SQLAlchemy-SQL у SQL-синтаксис конкретної СУБД. Наприклад, функція func.now() у PostgreSQL стає NOW(), а у MySQL — NOW() чи SYSDATE() залежно від версії. Ваш Python-код залишається незмінним.
СУБДSQLAlchemy DSN-префіксSync-драйвериAsync-драйвери
PostgreSQLpostgresql+<driver>://psycopg2, psycopgasyncpg, psycopg (async)
MySQL / MariaDBmysql+<driver>://mysqlclient, PyMySQLasyncmy, aiomysql
SQLitesqlite+<driver>:///вбудований sqlite3aiosqlite
Oracle Databaseoracle+<driver>://cx_Oracle, oracledboracledb (async)
Microsoft SQL Servermssql+<driver>://pyodbc, pymssqlaioodbc

Розглянемо кожну БД детальніше.

MySQL та MariaDB

MySQL та його повністю сумісний форк MariaDB є найпоширенішими реляційними СУБД у веброзробці. SQLAlchemy підтримує обидві через однаковий mysql+<driver>:// префікс (MariaDB автоматично визначається за версією сервера).

# mysqlclient: найшвидший синхронний драйвер для MySQL (C-розширення)
# pip install mysqlclient
engine = create_engine(
    "mysql+mysqldb://user:password@localhost:3306/mydb?charset=utf8mb4"
)
MySQL та MariaDB мають кілька важливих відмінностей від PostgreSQL, які варто знати:
  • За замовчуванням рушій InnoDB у MySQL підтримує транзакції та зовнішні ключі. Старий MyISAM — не підтримує. SQLAlchemy очікує InnoDB.
  • BOOLEAN у MySQL зберігається як TINYINT(1) (0/1), а не справжній булевий тип.
  • AUTO_INCREMENT замість PostgreSQL SERIAL / GENERATED ALWAYS AS IDENTITY — SQLAlchemy абстрагує це автоматично через primary_key=True.
SQLite

SQLite — вбудована у Python (import sqlite3) легковажна СУБД без сервера: вся база даних зберігається в одному .db-файлі на диску. Ідеально підходить для локальної розробки, тестів та невеликих проєктів.

# Не потрібен додатковий пакет — sqlite3 вбудований у Python
engine = create_engine(
    "sqlite:///./taskforge_dev.db"  # відносний шлях до файлу
)

# In-memory БД (існує лише у RAM, зникає при закритті):
engine_memory = create_engine("sqlite:///:memory:")
SQLite — стандартний вибір для тестів у FastAPI-проєктах: не потрібен Docker чи реальний PostgreSQL. Просто підміняйте DATABASE_URL через змінну середовища в conftest.py. In-memory SQLite (sqlite:///:memory:) робить тести максимально ізольованими — кожен тест починає з чистої бази.
Microsoft SQL Server (MSSQL)

Microsoft SQL Server широко використовується у корпоративному середовищі (особливо там, де вже є інфраструктура Microsoft). SQLAlchemy підтримує його через ODBC-підключення.

MSSQL — sync (pyodbc)
# pip install pyodbc
# Вимагає встановленого ODBC Driver for SQL Server на ОС
engine = create_engine(
    "mssql+pyodbc://user:password@mssql-server:1433/mydb"
    "?driver=ODBC+Driver+18+for+SQL+Server"
    "&TrustServerCertificate=yes"
)
MSSQL — async (aioodbc)
# pip install aioodbc
from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
    "mssql+aioodbc://user:password@mssql-server:1433/mydb"
    "?driver=ODBC+Driver+18+for+SQL+Server"
    "&TrustServerCertificate=yes"
)
SQL Server відрізняється від PostgreSQL кількома деталями: використовує IDENTITY замість SERIAL, NVARCHAR замість VARCHAR для Unicode, та TOP N замість LIMIT N. SQLAlchemy повністю приховує ці відмінності — ваш Python-код з select(User).limit(10) однаково коректно виконається і на PostgreSQL, і на SQL Server.
Зведена таблиця вибору БД та драйвера для нових проєктів
СценарійСУБДДрайверDSN
FastAPI production (async)PostgreSQLasyncpgpostgresql+asyncpg://
FastAPI production (sync)PostgreSQLpsycopg (v3)postgresql+psycopg://
Локальна розробка / тестиSQLiteaiosqlitesqlite+aiosqlite:///./dev.db
Unit-тести (in-memory)SQLiteaiosqlitesqlite+aiosqlite:///:memory:
Корпоративний стек MicrosoftMSSQLpyodbc / aioodbcmssql+pyodbc://
Спадковий стек / хостингиMySQL/MariaDBasyncmymysql+asyncmy://

Синхронний Engine: create_engine()

Функція create_engine() приймає рядок підключення (DSN — Data Source Name) та низку опцій, що конфігурують поведінку пулу.

core/database.py
from sqlalchemy import create_engine

# Рядок підключення (DSN) для PostgreSQL з psycopg2
DATABASE_URL = "postgresql+psycopg2://user:password@localhost:5432/taskforge_db"

engine = create_engine(
    DATABASE_URL,
    # --- Параметри Connection Pool ---
    pool_size=10,       # Кількість «постійних» підключень у пулі
    max_overflow=20,    # Максимум тимчасових підключень понад pool_size
    pool_timeout=30,    # Секунд очікування вільного підключення з пулу
    pool_recycle=1800,  # Перестворювати підключення кожні 30 хвилин
                        # (щоб уникнути "stale connections" від PostgreSQL)
    # --- Налагодження ---
    echo=True,          # Виводити всі SQL-запити у консоль (лише для розробки!)
)
Параметр echo=True є надзвичайно корисним інструментом для налагодження — він дозволяє бачити кожен SQL-запит, що генерує SQLAlchemy. Проте у production-середовищі він категорично вимкнений, оскільки виводить чутливі дані та значно навантажує систему логування.

Асинхронний Engine: create_async_engine()

Для роботи з FastAPI у повністю асинхронному режимі (що є рекомендованим підходом, як ми дізналися у статті 14) необхідно використовувати create_async_engine() разом із асинхронним драйвером asyncpg.

core/database.py
from sqlalchemy.ext.asyncio import create_async_engine

# Зверніть на префікс: postgresql+asyncpg (не psycopg2!)
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/taskforge_db"

async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_recycle=1800,
    echo=False,  # У production завжди False
)

Ключова різниця між create_engine() та create_async_engine() полягає у внутрішньому DBAPI-драйвері:

  • postgresql+psycopg2 → синхронний драйвер, блокує потік виконання.
  • postgresql+asyncpg → асинхронний драйвер, звільняє event loop Python під час очікування відповіді від БД.
Починаючи з SQLAlchemy 2.0 та psycopg (версія 3, не psycopg2), з'явилася можливість використовувати postgresql+psycopg (без суфікса 2) — це сучасна версія драйвера, що підтримує як синхронний, так і асинхронний режими. Вона є пріоритетним вибором для нових проєктів.

Session та sessionmaker: Одиниця роботи

Session — це центральний об'єкт для роботи з ORM. Він реалізує патерн Unit of Work (Одиниця роботи), що є фундаментальним архітектурним патерном для роботи з базами даних (ми детально розглянемо його у розділі «Під капотом»).

Коротко: Session — це «записник» або «тимчасова пам'ять», яка:

  1. Відстежує всі Python-об'єкти (ORM-моделі), завантажені з БД або додані до нього.
  2. Накопичує всі зміни (INSERT, UPDATE, DELETE), не надсилаючи їх одразу до БД.
  3. При виклику session.flush() або session.commit() — генерує відповідні SQL-запити та виконує їх у межах однієї транзакції.

sessionmaker та async_sessionmaker

Оскільки Session має бути короткоживучим об'єктом (один запит = одна сесія), зручно мати фабрику, яка створює нові сесії з потрібними налаштуваннями. Для цього слугують sessionmaker та async_sessionmaker.

Базовий шаблон виглядає так:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine("postgresql+psycopg://user:pass@localhost/db")

SessionLocal = sessionmaker(
    bind=engine,
    autocommit=False,
    autoflush=False,
    expire_on_commit=False,
)

# Один HTTP-запит = одна сесія:
with SessionLocal() as session:
    user = session.get(User, 1)
    user.is_active = False
    session.commit()

Синхронний Engine: create_engine()

Функція create_engine() приймає рядок підключення (DSN — Data Source Name) та низку опцій, що конфігурують поведінку пулу.

core/database.py
from sqlalchemy import create_engine

# Рядок підключення (DSN) для PostgreSQL з psycopg2
DATABASE_URL = "postgresql+psycopg2://user:password@localhost:5432/taskforge_db"

engine = create_engine(
    DATABASE_URL,
    # --- Параметри Connection Pool ---
    pool_size=10,       # Кількість «постійних» підключень у пулі
    max_overflow=20,    # Максимум тимчасових підключень понад pool_size
    pool_timeout=30,    # Секунд очікування вільного підключення з пулу
    pool_recycle=1800,  # Перестворювати підключення кожні 30 хвилин
                        # (щоб уникнути "stale connections" від PostgreSQL)
    # --- Налагодження ---
    echo=True,          # Виводити всі SQL-запити у консоль (лише для розробки!)
)
Параметр echo=True є надзвичайно корисним інструментом для налагодження — він дозволяє бачити кожен SQL-запит, що генерує SQLAlchemy. Проте у production-середовищі він категорично вимкнений, оскільки виводить чутливі дані та значно навантажує систему логування.

Асинхронний Engine: create_async_engine()

Для роботи з FastAPI у повністю асинхронному режимі (що є рекомендованим підходом, як ми дізналися у статті 14) необхідно використовувати create_async_engine() разом із асинхронним драйвером asyncpg.

core/database.py
from sqlalchemy.ext.asyncio import create_async_engine

# Зверніть на префікс: postgresql+asyncpg (не psycopg2!)
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/taskforge_db"

async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_recycle=1800,
    echo=False,  # У production завжди False
)

Ключова різниця між create_engine() та create_async_engine() полягає у внутрішньому DBAPI-драйвері:

  • postgresql+psycopg2 → синхронний драйвер, блокує потік виконання.
  • postgresql+asyncpg → асинхронний драйвер, звільняє event loop Python під час очікування відповіді від БД.
Починаючи з SQLAlchemy 2.0 та psycopg (версія 3, не psycopg2), з'явилася можливість використовувати postgresql+psycopg (без суфікса 2) — це сучасна версія драйвера, що підтримує як синхронний, так і асинхронний режими. Вона є пріоритетним вибором для нових проєктів.

Параметри конфігурації Connection Pool у Engine

При створенні Engine або AsyncEngine за допомогою параметрів можна тонко налаштувати поведінку вбудованого пулу з'єднань (QueuePool). Розглянемо ключові параметри, їхні значення за замовчуванням та вплив на систему:

  • pool_size (тип int, за замовчуванням 5) Визначає кількість «постійно відкритих» (keep-alive) підключень, які пул зберігає в пам'яті. Ці з'єднання ніколи не закриваються при звільненні.
    • На що впливає: Лімітує базову пропускну здатність. Якщо ваші FastAPI-воркери роблять мало паралельних запитів, 5–10 підключень цілком достатньо. Для навантажених додатків це значення збільшують до 20–50.
    • Застереження: Кожне з'єднання споживає RAM на сервері PostgreSQL (близько 10 МБ на з'єднання). Збільшення pool_size понад ліміт max_connections у конфігурації PostgreSQL призведе до того, що база даних відхилятиме нові підключення.
  • max_overflow (тип int, за замовчуванням 10) Визначає максимальну кількість тимчасових додаткових підключень, які пул може створити, якщо всі з'єднання з pool_size зараз зайняті.
    • На що впливає: Дозволяє додатку безболісно переживати раптові піки навантаження. Коли пік минає, ці «overflow» підключення автоматично закриваються.
    • Розрахунок: Загальна максимальна кількість одночасно відкритих підключень з одного екземпляра додатка дорівнює pool_size + max_overflow. Наприклад, pool_size=10, max_overflow=20 дає ліміт у 30 з'єднань.
  • pool_recycle (тип int, за замовчуванням -1, тобто вимкнено) Визначає максимальний вік підключення у секундах. Якщо підключення старше за це значення, при поверненні в пул воно буде закрите, а замість нього буде створено нове.
    • На що впливає: Рятує від проблеми stale connections (застарілих підключень). Деякі мережеві екрани (firewalls), хмарні бази даних (наприклад, AWS RDS, Heroku Postgres) або сам PostgreSQL можуть примусово розривати неактивні TCP-сесії після певного періоду простою (наприклад, 10 або 30 хвилин). Якщо пул спробує використати таке з'єднання, виникне помилка на кшталт OperationalError: Connection handshake failed. Встановлення pool_recycle=1800 (30 хвилин) змушує оновлювати підключення превентивно.
  • pool_pre_ping (тип bool, за замовчуванням False) Якщо встановлено в True, при кожному отриманні з'єднання з пулу SQLAlchemy виконує швидкий тестовий запит (зазвичай SELECT 1) перед тим, як віддати з'єднання вашому коду.
    • На що впливає: Забезпечує високу стійкість до збоїв. Якщо з'єднання виявиться «мертвим» (через перезапуск бази даних або мережевий збій), пул тихо закриє його, відкриє нове працездатне підключення та прозоро віддасть вашому коду. Розробник навіть не помітить, що з'єднання розривалося.
    • Рекомендація: Завжди ставте pool_pre_ping=True у production. Хоча це додає крихітний оверхед на один SQL-запит-тест, стійкість додатку зростає в рази.
Приклад повної production конфігурації Engine:
async_engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,           # Збільшено для навантаженого API
    max_overflow=10,        # Додаткові 10 з'єднань під час піків
    pool_recycle=1800,      # Оновлювати кожні 30 хвилин (захист від AWS RDS таймаутів)
    pool_pre_ping=True,     # ✅ Перевіряти працездатність з'єднання перед відправкою запиту
)

Session та sessionmaker: Одиниця роботи

Session — це центральний об'єкт для роботи з ORM. Він реалізує патерн Unit of Work (Одиниця роботи), що є фундаментальним архітектурним патерном для роботи з базами даних (ми детально розглянемо його у розділі «Під капотом»).

Коротко: Session — це «записник» або «тимчасова пам'ять», яка:

  1. Відстежує всі Python-об'єкти (ORM-моделі), завантажені з БД або додані до нього.
  2. Накопичує всі зміни (INSERT, UPDATE, DELETE), не надсилаючи їх одразу до БД.
  3. При виклику session.flush() або session.commit() — генерує відповідні SQL-запити та виконує їх у межах однієї транзакції.

sessionmaker та async_sessionmaker

Оскільки Session має бути короткоживучим об'єктом (один запит = одна сесія), зручно мати фабрику, яка створює нові сесії з потрібними налаштуваннями. Для цього слугують sessionmaker та async_sessionmaker.

Базовий шаблон виглядає так:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine("postgresql+psycopg://user:pass@localhost/db")

SessionLocal = sessionmaker(
    bind=engine,
    autocommit=False,
    autoflush=False,
    expire_on_commit=False,
)

# Один HTTP-запит = одна сесія:
with SessionLocal() as session:
    user = session.get(User, 1)
    user.is_active = False
    session.commit()

Параметри sessionmaker та async_sessionmaker: повний розбір

Обидві фабрики приймають однакові параметри. Нижче — вичерпна документація кожного з них із прикладами впливу на поведінку.


bind / autobegin

Прив'язує фабрику до конкретного Engine або AsyncEngine. Усі сесії, створені цією фабрикою, будуть автоматично використовувати саме це підключення до БД. Якщо не вказати тут — доведеться передавати bind при кожному виклику Session().

Якщо True, SQLAlchemy автоматично починає нову транзакцію (BEGIN) при першій операції з БД. Якщо False — транзакцію потрібно починати вручну через session.begin(). Значення True (за замовчуванням) є зручним та безпечним для більшості випадків.

bind + autobegin
# Стандартне використання — bind + autobegin=True (за замовчуванням)
SessionLocal = sessionmaker(bind=engine, autobegin=True)

with SessionLocal() as session:
    # При першому execute() SQLAlchemy автоматично відкриває транзакцію:
    # BEGIN (implicit)
    user = session.get(User, 1)
    session.commit()
    # COMMIT

# autobegin=False — повний ручний контроль (рідко потрібен):
ManualSession = sessionmaker(bind=engine, autobegin=False)
with ManualSession() as session:
    with session.begin():         # Явний BEGIN
        user = session.get(User, 1)
    # COMMIT автоматично при виході з session.begin()

autocommit

Чому False? Це один з найважливіших параметрів, що забезпечує атомарність операцій.

Якщо autocommit=True — кожен SQL-запит виконується у своїй власній транзакції і негайно комітиться. Це означає, що неможливо атомарно виконати кілька операцій: якщо після першого INSERT програма впаде — другий INSERT вже буде збережений у БД, а перший — ні. Дані опиняться у неузгодженому стані.

Якщо autocommit=False — всі операції в межах сесії виконуються в одній транзакції, яку ви контролюєте через commit() / rollback(). Це гарантує ACID-властивості.

Різниця між autocommit=True та autocommit=False
# ❌ autocommit=True — небезпечно, не використовуйте у production
AutoCommitSession = sessionmaker(bind=engine, autocommit=True)

with AutoCommitSession() as session:
    # Кожен виклик одразу комітиться окремо!
    session.add(User(username="alice"))  # SQL: INSERT + COMMIT
    # Якщо тут виникне помилка:
    raise RuntimeError("Щось пішло не так!")
    session.add(User(username="bob"))    # Цей INSERT ніколи не виконається
    # alice вже збережена у БД — неузгодженість!

# ✅ autocommit=False — безпечно (за замовчуванням)
SafeSession = sessionmaker(bind=engine, autocommit=False)

with SafeSession() as session:
    try:
        session.add(User(username="alice"))
        session.add(User(username="bob"))
        session.commit()  # Обидва INSERT у ONE транзакції
    except Exception:
        session.rollback()  # Обидва відкочуються — БД у чистому стані

autoflush

Чому у FastAPI зазвичай ставлять False?

За замовчуванням autoflush=True: SQLAlchemy автоматично виконує flush() (відправляє накопичені зміни у БД у рамках поточної транзакції) перед кожним SELECT-запитом у межах поточної сесії.

На перший погляд це зручно: ви завжди бачите актуальні дані навіть до commit(). Але у FastAPI-сервісах, де сесія часто передається через DI між кількома шарами (router → service → repository), autoflush=True може призводити до неочікуваних SQL-запитів і ускладнює налагодження.

Рекомендація для FastAPI: autoflush=False. Ви самостійно викликаєте session.flush() тільки коли це потрібно (наприклад, щоб отримати згенерований id). Це дає повний контроль і передбачувану поведінку.

Різниця між autoflush=True та autoflush=False
# ✅ autoflush=True (за замовчуванням у SQLAlchemy)
AutoFlushSession = sessionmaker(bind=engine, autoflush=True)

with AutoFlushSession() as session:
    new_user = User(username="charlie")
    session.add(new_user)
    # ↓ Тут ще немає flush()

    # Але при будь-якому SELECT — автоматичний flush перед ним:
    existing = session.execute(select(User).where(User.username == "alice")).scalar()
    # SQLAlchemy робить:
    # 1. INSERT INTO users (username) VALUES ('charlie')  ← autoflush!
    # 2. SELECT * FROM users WHERE username = 'alice'

    # Це зручно, але може бути несподіваним, якщо charlie ще «не готовий»
    # (наприклад, не встановлено обов'язкове поле email)

# ✅ autoflush=False (рекомендовано для FastAPI)
ManualFlushSession = sessionmaker(bind=engine, autoflush=False)

with ManualFlushSession() as session:
    new_user = User(username="charlie")
    session.add(new_user)

    # SELECT виконується одразу, без autoflush
    existing = session.execute(select(User).where(User.username == "alice")).scalar()
    # SQL: SELECT * FROM users WHERE username = 'alice'
    # charlie ще не INSERT-нутий — SELECT не «бачить» його

    # Явний flush лише тоді, коли потрібен id:
    session.flush()  # INSERT INTO users ...
    print(new_user.id)  # Тепер id доступний

    session.commit()

expire_on_commit

Найважливіший параметр для async FastAPI — завжди ставте False!

Після виклику commit() SQLAlchemy за замовчуванням (True) «відмічає» всі об'єкти у сесії як expired (застарілі). Це означає: при наступному зверненні до будь-якого атрибуту об'єкта SQLAlchemy виконає новий SELECT-запит, щоб «освіжити» дані з БД.

У синхронному режимі це відбувається непомітно — lazy refresh є автоматичним.

В асинхронному режимі це катастрофа: після await session.commit() сесія більше не активна в async-контексті. Спроба отримати атрибут expired-об'єкта викличе MissingGreenlet: greenlet_spawn has not been called — одна з найпоширеніших помилок початківців у async SQLAlchemy.

expire_on_commit — sync vs async
# --- SYNC: expire_on_commit=True (за замовчуванням) ---
SyncSession = sessionmaker(bind=engine, expire_on_commit=True)

with SyncSession() as session:
    user = session.get(User, 1)
    user.username = "updated"
    session.commit()
    # user тепер expired!

    # Але sync-режим «рятує» нас автоматично:
    print(user.username)
    # SELECT * FROM users WHERE id = 1  ← автоматичний lazy SELECT!
    # Виводить: "updated"   ✅ Працює, але виконує зайвий запит

# --- ASYNC: expire_on_commit=True — ПОМИЛКА! ---
BadAsyncSession = async_sessionmaker(bind=async_engine, expire_on_commit=True)

async with BadAsyncSession() as session:
    user = await session.get(User, 1)
    user.username = "updated"
    await session.commit()
    # user тепер expired!

    print(user.username)
    # ❌ MissingGreenlet: greenlet_spawn has not been called
    # Немає активної async-транзакції для lazy SELECT!

# --- ASYNC: expire_on_commit=False — правильно! ---
GoodAsyncSession = async_sessionmaker(bind=async_engine, expire_on_commit=False)

async with GoodAsyncSession() as session:
    user = await session.get(User, 1)
    user.username = "updated"
    await session.commit()
    # user НЕ expired — значення в пам'яті збережено

    print(user.username)  # ✅ "updated" — з пам'яті Python, без SQL
expire_on_commit=False означає, що після commit() атрибути об'єкта можуть не відображати реальний стан БД (якщо хтось зовні змінив той самий запис). У FastAPI-додатках це не є проблемою — кожен HTTP-запит отримує свіжу сесію та свіжий SELECT. Але якщо ваша логіка потребує «свіжих» даних після commit — виконайте await session.refresh(obj) вручну.

class_ (лише для async_sessionmaker)

Визначає конкретний клас, який буде створювати фабрика. Для async_sessionmaker потрібно явно передати class*=AsyncSession. Це дозволяє підставляти власні підкласи AsyncSession з кастомною логікою (наприклад, автоматичним логуванням або метриками).

class_ — кастомна сесія
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

class LoggedSession(AsyncSession):
    """AsyncSession з логуванням кожного коміту."""
    async def commit(self):
        print(f"[DB] Committing transaction in session {id(self)}")
        await super().commit()

LoggedSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=LoggedSession,  # Використовуємо наш кастомний клас
    expire_on_commit=False,
)

async with LoggedSessionLocal() as session:
    # ... операції ...
    await session.commit()
    # Виведе: [DB] Committing transaction in session 140234567890

Підсумковий конфіг для production FastAPI

Ось канонічна конфігурація async_sessionmaker для production FastAPI-додатку на основі всього вищесказаного:

core/database.py — production config
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)

async_engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost:5432/taskforge_db",
    pool_size=10,
    max_overflow=20,
    pool_recycle=1800,
    echo=False,
)

AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    autocommit=False,    # ✅ Явний контроль транзакцій (ACID-безпека)
    autoflush=False,     # ✅ Явний flush() — передбачувана поведінка
    expire_on_commit=False,  # ✅ Обов'язково для async FastAPI
)

ORM-Моделі у SQLAlchemy 2.0: Новий «Python-first» синтаксис

Якщо у вас є досвід роботи зі SQLAlchemy 1.x, перший погляд на код SQLAlchemy 2.0 може викликати легкий шок — він виглядає зовсім інакше. Це навмисна зміна: команда SQLAlchemy чітко взяла курс на максимальну інтеграцію з сучасним Python-синтаксисом типізації, наслідуючи ідеї Pydantic та dataclasses.

Еволюція синтаксису: від 1.x до 2.0

Щоб оцінити масштаб змін, подивимося на одну і ту саму модель у різних синтаксисах:

from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()  # Стара функція

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    is_active = Column(Boolean, default=True)

    # ❌ Немає type hints — IDE не знає тип атрибутів
    # ❌ user.id може бути int або None — незрозуміло без читання Column()

Різниця кардинальна. У новому синтаксисі тип стовпця (nullable/not-nullable) тепер виводиться автоматично з типової анотації:

  • Mapped[int]INTEGER NOT NULL
  • Mapped[int | None] або Mapped[Optional[int]]INTEGER NULL
  • Mapped[str]VARCHAR NOT NULL
  • Mapped[bool]BOOLEAN NOT NULL

DeclarativeBase: Базовий клас нового покоління

У SQLAlchemy 2.0 рекомендованим способом створення базового класу для всіх моделей є успадкування від DeclarativeBase замість виклику застарілої функції declarative_base().

models/base.py
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    """
    Базовий клас для всіх ORM-моделей проєкту.
    Всі моделі мають успадковуватися від цього класу.
    """
    pass

Ця проста конструкція робить кілька речей одночасно:

  1. Реєструє метадані — кожен підклас Base автоматично реєструє своє визначення таблиці у Base.metadata, що використовується при міграціях Alembic.
  2. Забезпечує підтримку type checking — сучасні IDE (PyCharm, VS Code з Pylance) повністю розуміють Mapped[T] і надають автодоповнення та перевірку типів.
  3. Дозволяє типізовану конфігурацію через __class_getitem__ механізм — можна визначати type_annotation_map для кастомних типів.

mapped_column(): Детальна конфігурація стовпців

Функція mapped_column() є аналогом старого Column(), але спроєктована для роботи разом із анотаціями Mapped[T]. Вона приймає всі ті самі аргументи, що і Column(), але вже не потребує явного вказання типу (він виводиться з Mapped[T]).

Розглянемо повний приклад моделі User з усіма типовими сценаріями:

models/user.py
from datetime import datetime
from sqlalchemy import String, Text, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    # --- Первинний ключ ---
    id: Mapped[int] = mapped_column(primary_key=True)

    # --- Рядкові поля з обмеженнями ---
    username: Mapped[str] = mapped_column(String(50), unique=True)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True)

    # --- Nullable поле (опціональне) ---
    # Mapped[str | None] → VARCHAR NULL (поле може бути пустим)
    bio: Mapped[str | None] = mapped_column(Text, default=None)

    # --- Поле з серверним значенням за замовчуванням ---
    # server_default=func.now() → DEFAULT NOW() виконується на стороні PostgreSQL
    created_at: Mapped[datetime] = mapped_column(
        server_default=func.now()
    )

    # --- Поле з клієнтським значенням за замовчуванням ---
    # default=True → значення встановлюється Python-кодом, не SQL
    is_active: Mapped[bool] = mapped_column(default=True)

    def __repr__(self) -> str:
        return f"<User id={self.id} username={self.username!r}>"

Типи стовпців SQLAlchemy

SQLAlchemy надає багатий набір типів стовпців, що відображаються на відповідні SQL-типи конкретної бази даних:

Python тип в Mapped[T]SQLAlchemy типPostgreSQL тип
intIntegerINTEGER
strString(n) / TextVARCHAR(n) / TEXT
boolBooleanBOOLEAN
floatFloatFLOAT
datetimeDateTimeTIMESTAMP
dateDateDATE
DecimalNumeric(p, s)NUMERIC(p, s)
bytesLargeBinaryBYTEA
dictJSONJSONB
UUIDUuid (2.0+)UUID

Детальний опис найбільш популярних типів стовпців SQLAlchemy, їхнього призначення, особливостей використання та поширених помилок:

Integer / BigInteger
Mapped[int]
Представляє цілі числа різної розрядності.
  • Коли використовувати: Integer підходить для стандартних лічильників, невеликих ID та статусів. BigInteger (відображається в BIGINT) є обов'язковим для первинних ключів (ID) у великих або швидкозростаючих таблицях (де кількість записів може перевищити 2 мільярди), грошових сум у копійках, Telegram ID чи інших великих зовнішніх ідентифікаторів.
  • Коли НЕ використовувати: Уникайте Integer для первинних ключів у великих таблицях — переповнення INT є класичною причиною збоїв великих систем. Також не використовуйте цілі числа для фінансових розрахунків, якщо тільки ви не зберігаєте суми строго у мінімальних одиницях валюти (наприклад, центах).
# Приклад використання
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
views_count: Mapped[int] = mapped_column(Integer, default=0)
String(length) / Text
Mapped[str]
Текстові дані фіксованої обмеженої або необмеженої довжини.
  • Коли використовувати: String(n) підходить для коротких обмежених рядків (імена, email, slug, паролі). Завжди вказуйте довжину для оптимізації та валідації на рівні СУБД. Text використовується для довгих текстів без чіткого ліміту довжини (коментарі, статті, описи).
  • Коли НЕ використовувати: Не використовуйте String без ліміту (просто String()) у базах даних, які не підтримують необмежений VARCHAR без явного зазначення довжини (наприклад, MySQL вимагає довжину, тоді як PostgreSQL дозволяє VARCHAR без довжини, що еквівалентно TEXT). Не зберігайте складну структуру (JSON, списки) у звичайних текстових стовпцях.
# Приклад використання
email: Mapped[str] = mapped_column(String(255), unique=True)
content: Mapped[str] = mapped_column(Text)
Boolean
Mapped[bool]
Логічний тип для збереження значень True або False.
  • Коли використовувати: Для бінарних прапорців стану, таких як is_active, has_permission, email_verified.
  • Коли НЕ використовувати: Якщо стан сутності має складніший життєвий цикл (наприклад, статус замовлення: draft, processing, shipped, delivered), краще використовувати Enum або зв'язану таблицю статусів замість створення кількох булевих полів на кшталт is_draft, is_processed.
# Приклад використання
is_verified: Mapped[bool] = mapped_column(Boolean, default=False)
Numeric(precision, scale) / Float
Mapped[Decimal] / Mapped[float]
Числа з фіксованою точністю та з плаваючою крапкою.
  • Коли використовувати: Numeric(p, s) (або DECIMAL) є єдиним правильним вибором для збереження грошових сум, фінансових розрахунків та точних відсотків, де недопустимі помилки округлення. Float підходить для наукових обчислень, гео-координат (широта/довгота), метрик, де крихітні похибки через особливості репрезентації float-чисел у пам'яті комп'ютера не мають критичного значення.
  • Коли НЕ використовувати: Ніколи не використовуйте Float для збереження грошей. Помилки округлення на кшталт 0.1 + 0.2 = 0.30000000000000004 призведуть до фінансових розбіжностей.
# Приклад використання
from decimal import Decimal
price: Mapped[Decimal] = mapped_column(Numeric(10, 2))  # Максимум 99999999.99
latitude: Mapped[float] = mapped_column(Float)
DateTime(timezone) / Date / Time
Mapped[datetime] / Mapped[date] / Mapped[time]
Типи для роботи з часом та датами.
  • Коли використовувати: Для часових позначок створення/оновлення, логів та подій завжди використовуйте DateTime(timezone=True) (збереження UTC часової зони). Date підходить для днів народження чи календарних дат (без прив'язки до годин). Time — для розкладів (наприклад, "о 18:00 кожні вихідні").
  • Коли НЕ використовувати: Уникайте збереження часових міток як цілих чисел (timestamp в Unix Epoch int), оскільки це ускладнює роботу зі SQL-запитами безпосередньо у базі даних (ускладнює агрегації по датах, групування тощо). Уникайте DateTime без часової зони (timezone=False), якщо додатком користуються користувачі в різних часових поясах.
# Приклад використання
from datetime import datetime, date
registered_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
birth_date: Mapped[date] = mapped_column(Date)
JSON / pg.JSONB
Mapped[dict | list]
Збереження неструктурованих або напівструктурованих даних (словників та списків).
  • Коли використовувати: Для конфігурацій, гнучких метаданих додаткових полів користувача, кешування або історії змін. У PostgreSQL завжди надавайте перевагу sqlalchemy.dialects.postgresql.JSONB, оскільки він зберігає дані у декомпонованому бінарному форматі, що дозволяє індексувати JSON-ключі (через GIN-індекси) та виконувати швидкий пошук по них.
  • Коли НЕ використовувати: Не використовуйте JSON для полів, які беруть участь у реляційних зв'язках (JOIN), або полів, які часто оновлюються окремо. Якщо вам постійно доводиться робити запити всередину JSON-документа для фільтрації, краще винести ці ключі в окремі класичні SQL-стовпці.
# Приклад використання
from sqlalchemy.dialects.postgresql import JSONB
metadata_info: Mapped[dict] = mapped_column(JSONB, default=dict)
Uuid
Mapped[uuid.UUID]
Універсальний унікальний ідентифікатор (UUID).
  • Коли використовувати: Ідеачно для первинних ключів у розподілених системах, де клієнт або мікросервіс може самостійно згенерувати ID до запису в БД, або для публічних ID сутностей, щоб приховати внутрішню послідовність (sequential IDs) від перебору (enumeration attack). У SQLAlchemy 2.0 є нативний тип Uuid.
  • Коли НЕ використовувати: У дуже великих таблицях, які потребують високої швидкості вставки (INSERT). UUIDv4 генерується випадковим чином, через що індекси B-Tree сильно фрагментуються, сповільнюючи операції запису. Для таких випадків краще використовувати UUIDv7 (сортовані за часом) або залишати автоінкрементний BigInteger для внутрішніх зв'язків, а UUID використовувати лише як зовнішній ідентифікатор.
# Приклад використання
import uuid
from sqlalchemy import Uuid
uuid_id: Mapped[uuid.UUID] = mapped_column(Uuid, default=uuid.uuid4)
Enum
Mapped[YourEnumClass]
Обмежений набір значень на основі Python enum.Enum.
  • Коли використовувати: Для статусів, ролей користувачів, категорій, де набір значень є чітким та заздалегідь визначеним на рівні коду.
  • Коли НЕ використовувати: Не створюйте нативний ENUM бази даних (параметр native_enum=True), якщо список статусів буде часто змінюватися, оскільки операція оновлення нативного ENUM у схемі БД (наприклад, ALTER TYPE ... ADD VALUE) в деяких СУБД є блокуючою або складною для міграцій. Встановлюйте native_enum=False (за замовчуванням у SQLAlchemy для багатьох БД) — тоді значення зберігатимуться як VARCHAR з обмеженням CHECK constraint.
# Приклад використання
import enum
from sqlalchemy import Enum

class UserRole(enum.Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"

role: Mapped[UserRole] = mapped_column(Enum(UserRole), default=UserRole.USER)
Для поля UUID у SQLAlchemy 2.0 з'явився нативний тип Uuid, що автоматично конвертує uuid.UUID з Python у відповідний тип бази даних:
import uuid
from sqlalchemy import Uuid

class Project(Base):
    __tablename__ = "projects"
    id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)

Стандартний абстрактний базовий клас з аудитними полями

У реальних проєктах зручно винести загальні поля (наприклад, id, created_at, updated_at) до абстрактного класу, від якого успадковуватимуться всі моделі — так само, як це роблять у EF Core через клас BaseEntity:

public abstract class BaseEntity
{
    public int Id { get; set; }
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    public DateTime? UpdatedAt { get; set; }
}

public class User : BaseEntity
{
    public string Username { get; set; } = default!;
}

Параметр onupdate=func.now() у mapped_column() — це «клієнтський» тригер: SQLAlchemy автоматично підставляє поточний час у UPDATE-запит, якщо об'єкт змінився. Відповідником у PostgreSQL є тригер BEFORE UPDATE, але цей підхід зручніший — логіка знаходиться у Python-коді, а не у SQL.


Relationships: Зв'язки між таблицями

Реляційні бази даних отримали свою назву саме через зв'язки між таблицями. Головна сила ORM полягає в тому, що ці зв'язки можна виразити декларативно у Python-коді, а потім працювати з ними як зі звичайними атрибутами об'єктів.

У SQLAlchemy для визначення зв'язків між моделями використовується функція relationship() у поєднанні з ForeignKey у визначенні стовпця.

argument
str | type
Перший позиційний аргумент — клас моделі, з якою створюється зв'язок (або його рядкове ім'я, наприклад "Post"). Рядкові імена запобігають помилкам циклічного імпорту в Python.
back_populates
str
Ім'я атрибуту зв'язку на цільовій моделі, що посилається назад на поточну модель. Необхідний для підтримки двонаправленої синхронізації об'єктів у пам'яті Python.
cascade
str
Правила розповсюдження операцій від батьківського об'єкта до дочірніх. Може містити комбінацію наступних значень через кому:
  • save-update (за замовчуванням): додавання батьківського об'єкта до сесії (session.add()) автоматично додає всі пов'язані дочірні об'єкти.
  • merge (за замовчуванням): злиття стану батьківського об'єкта (session.merge()) також зливає стан пов'язаних дочірніх об'єктів.
  • delete: при видаленні батьківського об'єкта (session.delete(parent)) автоматично видаляються всі пов'язані дочірні записи з БД.
  • delete-orphan: якщо дочірній об'єкт від'єднати від колекції батьківського (наприклад, видалити зі списку), він вважається «сиротою» та автоматично видаляється з БД, замість того, щоб отримати NULL у зовнішньому ключі.
  • refresh-expire: операції session.expire() або session.refresh() на батьківському об'єкті поширюються також і на дочірні.
  • expunge: при виведенні батьківського об'єкта із сесії (session.expunge()) дочірні об'єкти також виводяться з неї.
  • all: швидкий синонім для "save-update, merge, refresh-expire, expunge, delete". Найбільш популярне поєднання для One-to-Many з жорстким володінням: "all, delete-orphan".
passive_deletes
bool | str
Якщо True, SQLAlchemy не завантажуватиме дочірні записи з БД при видаленні батьківського об'єкта, а делегуватиме каскадне видалення базі даних (вимагає ON DELETE CASCADE у зовнішньому ключі).
lazy
str
Стратегія завантаження зв'язку за замовчуванням: "select" (lazy), "joined" (eager via JOIN), "selectin" (eager via IN), або "raise" (заборона lazy load).

Зовнішні ключі: ForeignKey

У той час як relationship() створює зв'язок на рівні Python-об'єктів, клас ForeignKey створює справжнє обмеження цілісності зовнішнього ключа (FOREIGN KEY constraint) безпосередньо у схемі бази даних.

ForeignKey передається як аргумент у mapped_column() для стовпців, що мають посилатися на інші таблиці.

column
str | Column
Перший позиційний аргумент — цільовий стовпець, на який посилається ключ. Зазвичай передається у вигляді рядка у форматі "ім'я_таблиці.ім'я_стовпця" (наприклад, "users.id"). Зверніть увагу: вказується саме фізичне ім'я таблиці з бази даних (__tablename__), а не назва класу ORM-моделі.
ondelete
str
Визначає поведінку бази даних при видаленні батьківського запису. Популярні SQL-опції:
  • "CASCADE": автоматично видалити дочірній рядок. Рекомендовано для сильних зв'язків.
  • "SET NULL": встановити значення цього поля в NULL (стовпець має бути nullable).
  • "RESTRICT": заборонити видалення батьківського рядка, якщо на нього посилаються дочірні.
  • "NO ACTION": стандартна поведінка (перевірка обмеження в кінці транзакції).
onupdate
str
Визначає поведінку бази даних при зміні значення первинного ключа в батьківському рядку. Зазвичай використовується "CASCADE", щоб оновлені ID автоматично прописувалися у дочірніх таблицях.
Приклад використання:
# Посилання на таблицю "users" стовпець "id" з каскадним видаленням на рівні СУБД
user_id: Mapped[int] = mapped_column(
    ForeignKey("users.id", ondelete="CASCADE")
)

One-to-Many (Один до Багатьох)

Найпоширеніший тип зв'язку. Один запис у таблиці A може мати декілька пов'язаних записів у таблиці B. Наприклад: один User може мати багато Post-ів.

models/blog.py
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import String, Text, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True)

    # "Один до Багатьох": один User має багато Post-ів
    # back_populates="author" зв'язує цю сторону зі стороною Post.author
    posts: Mapped[list["Post"]] = relationship(
        "Post",
        back_populates="author",
        cascade="all, delete-orphan",  # При видаленні User — видати всі його Post-и
    )

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str | None] = mapped_column(Text)

    # Зовнішній ключ — посилання на users.id
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    # "Зворотний" зв'язок: Post посилається на свого User (автора)
    author: Mapped["User"] = relationship("User", back_populates="posts")

Зверніть на кілька важливих деталей:

  1. from __future__ import annotations — це директива, що дозволяє використовувати рядкові посилання на класи ("Post", "User") без циклічного імпорту. Вона змушує Python відкладати обчислення анотацій і є стандартною практикою у файлах із взаємними посиланнями.
  2. back_populates — параметр, що встановлює двонаправлений зв'язок: коли ви змінюєте user.posts, SQLAlchemy автоматично оновлює post.author, і навпаки. Це синхронізація в оперативній пам'яті, а не в базі даних.
  3. cascade="all, delete-orphan" — правило каскадної поведінки: якщо об'єкт User видаляється, всі його пов'язані Post-и також будуть видалені (аналог ON DELETE CASCADE у SQL, але реалізований на рівні ORM).

Порівняння з EF Core Navigation Properties

У EF Core аналогічна структура виглядала б так:

public class User
{
    public int Id { get; set; }
    public string Username { get; set; } = default!;

    // Navigation Property: колекція пов'язаних Post-ів
    public ICollection<Post> Posts { get; set; } = new List<Post>();
}

public class Post
{
    public int Id { get; set; }
    public string Title { get; set; } = default!;

    // Foreign Key + Navigation Property
    public int AuthorId { get; set; }
    public User Author { get; set; } = default!;
}

Many-to-Many (Багато до Багатьох)

Зв'язок «багато до багатьох» реалізується через асоціативну (проміжну) таблицю. Наприклад: один Post може мати багато Tag-ів, і один Tag може належати до багатьох Post-ів.

У SQLAlchemy 2.0 є два підходи для реалізації зв'язку «багато до багатьох».

Конструювання таблиць на рівні Core: Table та Column

Для створення найпростішої проміжної таблиці (без додаткових полів) у SQLAlchemy використовується низькорівневий конструктор Table та клас Column з пакету sqlalchemy (Core-рівень). Оскільки ця таблиця потрібна лише для зв'язку між об'єктами і ми не будемо створювати для неї окремий Python-клас моделі, вона оголошується як об'єкт.

name
str
Перший позиційний аргумент — фізичне ім'я таблиці у базі даних (наприклад, "post_tags").
metadata
MetaData
Другий позиційний аргумент — об'єкт MetaData, у якому реєструється таблиця (зазвичай передається Base.metadata). Це необхідно, щоб SQLAlchemy та інструмент міграцій Alembic знали про існування цієї таблиці та могли згенерувати її в БД.
args (Column / Constraint)
Column | Constraint
Усі наступні позиційні аргументи (*args) — це стовпці таблиці (об'єкти Column) або обмеження цілісності (наприклад, UniqueConstraint).

Також у цих таблицях використовуються класи Column (старий Core-еквівалент сучасного ORM-івського mapped_column()):

name
str
Рядкове ім'я стовпця у базі даних. Якщо стовпець створюється всередині Table(), це ім'я є обов'язковим першим аргументом.
type
TypeEngine
Тип даних SQL type_ (наприклад, Integer, String(50)).
args / kwargs
Any
Додаткові опції конфігурації (*args / **kwargs): ForeignKey, primary_key=True, nullable=False, index=True тощо.

Підхід 1: Чиста асоціативна таблиця (якщо таблиця не має власних атрибутів):

models/tags.py
from sqlalchemy import Table, Column, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

class Base(DeclarativeBase):
    pass

# Проміжна таблиця (без власного класу)
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", ForeignKey("tags.id"), primary_key=True),
)

class Post(Base):
    __tablename__ = "posts"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))

    # Many-to-Many через secondary (проміжну таблицю)
    tags: Mapped[list["Tag"]] = relationship(
        "Tag",
        secondary=post_tags,
        back_populates="posts"
    )

class Tag(Base):
    __tablename__ = "tags"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)

    posts: Mapped[list["Post"]] = relationship(
        "Post",
        secondary=post_tags,
        back_populates="tags"
    )

Підхід 2: Асоціативний клас (якщо проміжна таблиця має власні атрибути, наприклад, role у ProjectMember):

models/project.py
from datetime import datetime
from sqlalchemy import String, ForeignKey, Enum, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
import enum

class Base(DeclarativeBase):
    pass

class MemberRole(enum.Enum):
    OWNER = "owner"
    EDITOR = "editor"
    VIEWER = "viewer"

class ProjectMember(Base):
    """Асоціативна таблиця з власними атрибутами."""
    __tablename__ = "project_members"

    project_id: Mapped[int] = mapped_column(
        ForeignKey("projects.id"), primary_key=True
    )
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id"), primary_key=True
    )
    role: Mapped[MemberRole] = mapped_column(
        Enum(MemberRole), default=MemberRole.VIEWER
    )
    joined_at: Mapped[datetime] = mapped_column(server_default=func.now())

    # Navigation properties
    project: Mapped["Project"] = relationship(back_populates="memberships")
    user: Mapped["User"] = relationship(back_populates="memberships")

class Project(Base):
    __tablename__ = "projects"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))

    memberships: Mapped[list["ProjectMember"]] = relationship(
        back_populates="project"
    )

Параметри конфігурації relationship()

Функція relationship() керує об'єднанням моделей на рівні Python. Вона приймає низку критично важливих параметрів, які визначають поведінку зв'язків:

  • cascade (тип str, за замовчуванням "save-update, merge") Керує тим, як операції над батьківським об'єктом (наприклад, додавання, оновлення, видалення) поширюються на пов'язані дочірні об'єкти.
    • save-update: якщо додати батьківський об'єкт до сесії (session.add(parent)), усі його дочірні об'єкти автоматично додадуться до сесії.
    • delete: якщо видалити батьківський об'єкт (session.delete(parent)), автоматично видаляться всі пов'язані дочірні об'єкти (аналог ON DELETE CASCADE у БД, але виконується силами Python).
    • delete-orphan (критично важливий): якщо дочірній об'єкт від'єднати від батьківського (наприклад, видалити його зі списку parent.children.remove(child)), цей дочірній об'єкт автоматично буде видалено з БД (інакше він перетвориться на «сироту» — запис у БД з parent_id = NULL).
    • Рекомендований набір: для One-to-Many зв'язків із жорстким володінням (наприклад, User -> Posts) зазвичай ставлять cascade="all, delete-orphan" (all є синонімом для save-update, merge, refresh-expire, expunge, delete).
  • passive_deletes (тип bool або "all", за замовчуванням False) Впливає на поведінку каскадного видалення.
    • Якщо False, то при видаленні батьківського об'єкта SQLAlchemy виконає окремий SELECT-запит, щоб завантажити всі дочірні об'єкти в пам'ять, а потім згенерує окремі DELETE-запити для кожного з них. Це створює величезний оверхед.
    • Якщо True, SQLAlchemy НЕ завантажуватиме дочірні об'єкти, а просто видалить батьківський об'єкт. При цьому вона покладатиметься на те, що на рівні самої бази даних у зовнішньому ключі прописано ON DELETE CASCADE (наприклад, ForeignKey("parent.id", ondelete="CASCADE")). Це в рази швидше й ефективніше.
  • lazy (тип str, за замовчуванням "select") Визначає стратегію завантаження пов'язаних об'єктів за замовчуванням, якщо її не перевизначено в запиті:
    • "select" (або True): ліниве завантаження (lazy loading).
    • "joined" (або False): жадібне завантаження через LEFT OUTER JOIN (eager loading).
    • "selectin": жадібне завантаження окремим SELECT з IN (eager loading).
    • "raise": кидає помилку при спробі lazy loading. Дуже корисний режим для асинхронного коду, щоб гарантувати, що розробник не забув зробити eager loading і не отримав MissingGreenlet у невідповідний момент.
  • back_populates vs backref
    • back_populates (рекомендовано в 2.0) вимагає явного визначення relationship() на обох моделях із взаємним посиланням одна на одну. Це забезпечує повну сумісність із type hints та автодоповненням IDE.
    • backref (застарілий стиль) визначає зв'язок лише на одній моделі, а зворотний зв'язок на іншій створюється динамічно «магічним» шляхом. Це унеможливлює статичну перевірку типів через mypy та призводить до прихованих помилок.
Приклад зв'язку з оптимізованим каскадним видаленням та type safety:
class Project(Base):
    __tablename__ = "projects"
    id: Mapped[int] = mapped_column(primary_key=True)

    # Каскадне видалення дочірніх завдань оптимізовано через passive_deletes
    tasks: Mapped[list["Task"]] = relationship(
        "Task",
        back_populates="project",
        cascade="all, delete-orphan",
        passive_deletes=True,         # SQLAlchemy не буде робити SELECT перед DELETE
    )

class Task(Base):
    __tablename__ = "tasks"
    id: Mapped[int] = mapped_column(primary_key=True)

    # ON DELETE CASCADE на рівні бази даних для роботи passive_deletes
    project_id: Mapped[int] = mapped_column(
        ForeignKey("projects.id", ondelete="CASCADE")
    )
    project: Mapped["Project"] = relationship(
        "Project",
        back_populates="tasks"
    )

Lazy Loading vs Eager Loading та Проблема N+1

Одна з найбільш підступних проблем у роботі з ORM — це N+1 query problem (проблема N+1 запитів). Вона виникає через поведінку Lazy Loading і є однією з найпоширеніших причин повільної роботи вебдодатків.

Що таке Lazy Loading і чому він небезпечний?

Lazy Loading (ліниве завантаження) — це стратегія, при якій пов'язані об'єкти не завантажуються одразу разом із головним об'єктом. Натомість вони завантажуються «на вимогу» — лише тоді, коли ви вперше звертаєтеся до атрибута зв'язку.

У EF Core Lazy Loading увімкнено за замовчуванням і відбувається автоматично. У SQLAlchemy він також існує, але вимкнений за замовчуванням (особливо у async-режимі, де він взагалі не підтримується без спеціального налаштування).

Уявімо сценарій: отримати список 100 User-ів та для кожного — його username і кількість Post-ів.

З Lazy Loading (Погано ❌):

ANTI-PATTERN: N+1 проблема
# 1 запит: SELECT * FROM users LIMIT 100
users = session.execute(select(User).limit(100)).scalars().all()

for user in users:
    # +1 запит для КОЖНОГО користувача:
    # SELECT * FROM posts WHERE author_id = <user.id>
    print(f"{user.username}: {len(user.posts)} posts")

# Разом: 1 + 100 = 101 запит до бази даних!
# При 1000 користувачів — 1001 запит.

Це класична N+1 проблема: спочатку виконується 1 запит для отримання головних об'єктів, а потім для кожного з N об'єктів виконується додатковий запит для отримання пов'язаних даних.

Вирішення: Eager Loading через selectinload() та joinedload()

Eager Loading (жадібне завантаження) вирішує цю проблему, завантажуючи всі необхідні дані одним або двома SQL-запитами. SQLAlchemy надає кілька стратегій:

selectinload() — рекомендований підхід

Виконує два SQL-запити: перший — для головних об'єктів, другий — один SELECT ... WHERE author_id IN (1, 2, 3, ...) для всіх пов'язаних об'єктів одночасно.

PATTERN: selectinload (рекомендовано)
from sqlalchemy.orm import selectinload

# Два запити замість N+1:
# 1: SELECT * FROM users LIMIT 100
# 2: SELECT * FROM posts WHERE author_id IN (1, 2, 3, ..., 100)
users = session.execute(
    select(User)
    .options(selectinload(User.posts))
    .limit(100)
).scalars().all()

for user in users:
    print(f"{user.username}: {len(user.posts)} posts")
    # user.posts вже завантажені — жодного додаткового запиту!

joinedload() — для одиничних об'єктів

Виконує один SQL-запит з LEFT OUTER JOIN. Підходить для завантаження одного пов'язаного об'єкта (many-to-one або one-to-one), але може бути неефективним для колекцій:

PATTERN: joinedload (для single-object relations)
from sqlalchemy.orm import joinedload

# Один запит з JOIN:
# SELECT posts.*, users.* FROM posts
# LEFT OUTER JOIN users ON users.id = posts.author_id
# WHERE posts.id = 42
post = session.execute(
    select(Post)
    .options(joinedload(Post.author))
    .where(Post.id == 42)
).scalar_one()

print(f"Post by: {post.author.username}")  # Жодного додаткового запиту

Порівняння: .Include() у EF Core ↔ selectinload() у SQLAlchemy

// EF Core: .Include() для завантаження Navigation Property
var users = await context.Users
    .Include(u => u.Posts)  // Аналог selectinload(User.posts)
    .Take(100)
    .ToListAsync();

foreach (var user in users)
{
    Console.WriteLine($"{user.Username}: {user.Posts.Count} posts");
}
Практичне правило: завжди використовуйте selectinload() для колекцій (one-to-many, many-to-many) та joinedload() для одиничних зв'язків (many-to-one, one-to-one). Увімкніть echo=True на Engine під час розробки, щоб бачити реальні SQL-запити та вчасно помічати N+1 проблеми.
У асинхронному режимі (AsyncSession) Lazy Loading повністю не підтримується і призведе до MissingGreenlet-помилки. Це є корисним обмеженням: async-контекст змушує розробника явно вказувати стратегію завантаження через selectinload() або joinedload(), роблячи поведінку передбачуваною.

Просунуті стратегії завантаження зв'язків

Окрім базових selectinload() та joinedload(), SQLAlchemy надає інструменти для вирішення більш складних архітектурних завдань при завантаженні даних:

  • Вкладене завантаження (Nested Eager Loading) Часто потрібно завантажити дерево зв'язків (наприклад, завантажити Project, для нього — всі Task, а для кожного завдання — список Comment та автора коментаря User). Для цього використовується ланцюжок методів selectinload() або joinedload() через крапку:
    nested_loading.py
    from sqlalchemy.orm import selectinload
    
    stmt = (
        select(Project)
        .options(
            selectinload(Project.tasks)           # Крок 1: Завантажити завдання для проекту
            .selectinload(Task.comments)          # Крок 2: Завантажити коментарі для кожного завдання
            .joinedload(Comment.author)           # Крок 3: Жадібно підтягнути автора коментаря (many-to-one)
        )
    )
    projects = (await session.execute(stmt)).scalars().all()
    
  • selectinload() vs subqueryload() У старих версіях SQLAlchemy часто використовували subqueryload().
    • subqueryload() повторює вихідний SELECT-запит у підзапиті (subquery) для отримання дочірніх об'єктів. Це може бути вкрай неефективно, якщо вихідний запит мав складні JOIN або великі значення LIMIT/OFFSET (базі даних доводиться виконувати всю логіку вихідного запиту двічі).
    • selectinload() замість цього бере первинні ключі (ID) вже завантажених батьківських об'єктів і робить простий, швидкий запит SELECT ... WHERE parent_id IN (1, 2, 3...). Це набагато легше для планувальника запитів PostgreSQL.
    • Рекомендація: У 2.0 повністю відмовтеся від subqueryload() на користь selectinload().
  • contains_eager(): Завантаження зв'язків при ручному JOIN Уявіть ситуацію: вам потрібно відфільтрувати список проектів за статусом їхніх завдань (наприклад, знайти проекти, які мають хоча б одне критичне завдання), і водночас ви хочете, щоб ці завдання були одразу завантажені у властивість project.tasks. Якщо ви напишете .join(Project.tasks).options(selectinload(Project.tasks)), SQLAlchemy зробить два окремих JOIN: один для фільтрації, а другий (в окремому запиті) — для завантаження колекції. Це подвійна робота.
    Правильне рішення — contains_eager(). Воно вказує SQLAlchemy: «Я вже зробив JOIN вручну для фільтрації, візьми ці дані з результату вихідного запиту та поклади у властивість зв'язку»:
    contains_eager_demo.py
    from sqlalchemy.orm import contains_eager
    
    stmt = (
        select(Project)
        .join(Project.tasks)                                   # Ручний JOIN для фільтрації
        .where(Task.priority == TaskPriority.CRITICAL)         # Умова фільтрації
        .options(contains_eager(Project.tasks))                # Завантажити ці ж дані в об'єкт
    )
    # Виконається лише ОДИН SQL-запит із INNER JOIN,
    # і об'єкти project.tasks будуть повністю заповнені!
    projects = (await session.execute(stmt)).scalars().unique().all()
    
При використанні contains_eager() разом із join() на зв'язки типу "один-до-багатьох" (one-to-many) обов'язково викликайте метод .unique() на результаті виконання запиту перед .all(). Оскільки JOIN створює дублікати рядків для батьківської таблиці, .unique() гарантує, що SQLAlchemy згорне дублікати в унікальні об'єкти в пам'яті Python.

Запити у SQLAlchemy 2.0: Core DML API

У SQLAlchemy 1.x робота з запитами на рівні Core (конструктор SQL) та на рівні ORM (робота з об'єктами) була розділена: Core використовував select(users_table), а ORM — session.query(User).

Починаючи з SQLAlchemy 2.0, цей бар'єр повністю стерто. Тепер для будь-яких запитів використовується єдиний уніфікований Core DML (Data Manipulation Language) API. Ми конструюємо запит як об'єкт абстрактного синтаксичного дерева (AST) за допомогою функцій select(), insert(), update() та delete(), а потім виконуємо його через сесію.

Компіляція та інспектування SQL-запитів

Коли ви викликаєте select(User), ви не робите запит до бази даних і навіть не створюєте SQL-рядок. Ви створюєте Python-об'єкт типу Select. SQLAlchemy компілює цей об'єкт у SQL безпосередньо перед виконанням.

Ви можете переглянути згенерований SQL у будь-який момент:

from sqlalchemy import select
from sqlalchemy.dialects import postgresql

stmt = select(User).where(User.username == "arakviel")

# 1. Швидкий перегляд загального SQL (буде використано базовий діалект)
print(stmt)
# Виведе: SELECT users.id, users.username, users.email, users.is_active, users.created_at FROM users WHERE users.username = :username_1

# 2. Перегляд SQL для конкретної СУБД (наприклад, PostgreSQL)
compiled = stmt.compile(dialect=postgresql.dialect())
print(compiled)
# Виведе те саме, але орієнтоване під синтаксис PG

# 3. Перегляд параметрів, які будуть передані з запитом
print(compiled.params)
# Виведе: {'username_1': 'arakviel'}

Завдяки параметризації (використанню плейсхолдерів типу :username_1), SQLAlchemy повністю захищає ваш додаток від SQL-ін'єкцій «з коробки».


Читання даних: select()

Функція select() будує SQL-запити вибірки (SELECT).

Сигнатура

def select(*entities: _ColumnsClauseArgument[Any]) -> Select

Аргументами entities можуть бути класи ORM-моделей (наприклад, User), окремі стовпці (User.username), або SQL-функції (func.count(User.id)).

Ключові методи об'єкта Select (Chaining API)

Кожен метод повертає новий об'єкт Select, що дозволяє будувати ланцюжки викликів:

where()
Select
Додає умову WHERE до запиту. Приймає позиційні аргументи умов (*whereclauses). Кілька аргументів або послідовні виклики .where() об'єднуються через логічне AND.
# Базовий: фільтрація за значенням (рівність або порівняння)
select(User).where(User.is_active == True)
# SQL: SELECT ... FROM users WHERE users.is_active = true

select(User).where(User.id > 10)
# SQL: SELECT ... FROM users WHERE users.id > 10

# Просунутий: комбінація AND, фільтрація NULL, пошук IN та LIKE
select(User).where(
    User.is_active == True,
    User.email.is_not(None),
    User.id.in_([1, 5, 10]),
    User.username.like("ara%")
)
# SQL: SELECT ... FROM users WHERE users.is_active = true AND users.email IS NOT NULL AND users.id IN (1, 5, 10) AND users.username LIKE 'ara%'
filter_by()
Select
Спрощений варіант фільтрації WHERE для порівняння на рівність за назвами полів за допомогою іменованих аргументів (**kwargs). Наприклад: .filter_by(is_active=True).
# Базовий: фільтрація за одним полем
select(User).filter_by(is_active=True)
# SQL: SELECT ... FROM users WHERE users.is_active = true

# Просунутий: фільтрація за кількома полями одночасно (діє як AND)
select(User).filter_by(is_active=True, username="arakviel")
# SQL: SELECT ... FROM users WHERE users.is_active = true AND users.username = 'arakviel'
order_by()
Select
Сортування результатів запиту (ORDER BY). Приймає стовпці для сортування (*clauses). Для зворотного сортування викликається метод .desc() на стовпці, наприклад User.created_at.desc().
# Базовий: сортування за зростанням (за замовчуванням)
select(User).order_by(User.username)
# SQL: SELECT ... FROM users ORDER BY users.username ASC

# Просунутий: сортування за кількома полями, спадання (desc) та NULL в кінці
select(User).order_by(
    User.is_active.desc(),
    User.email.asc().nulls_last()
)
# SQL: SELECT ... FROM users ORDER BY users.is_active DESC, users.email ASC NULLS LAST
limit()
Select
Обмежує кількість рядків у результаті запиту (LIMIT). Приймає ціле число limit.
# Базовий: обмеження кількості результатів (перші 10 записів)
select(User).limit(10)
# SQL: SELECT ... FROM users LIMIT 10

# Просунутий: ліміт у підзапиті для фільтрації пов'язаних таблиць
subq = select(User.id).order_by(User.created_at.desc()).limit(5).subquery()
select(Post).where(Post.author_id.in_(select(subq)))
# SQL: SELECT ... FROM posts WHERE posts.author_id IN (SELECT users.id FROM users ORDER BY users.created_at DESC LIMIT 5)
offset()
Select
Пропускає вказану кількість перших рядків у результаті запиту (OFFSET). Приймає ціле число offset.
# Базовий: пропуск перших 10 записів
select(User).offset(10)
# SQL: SELECT ... FROM users OFFSET 10

# Просунутий: використання разом з .limit() для класичної пагінації
page, limit = 3, 20
select(User).limit(limit).offset((page - 1) * limit)
# SQL: SELECT ... FROM users LIMIT 20 OFFSET 40
group_by()
Select
Групує результати запиту (GROUP BY) за вказаними стовпцями (*clauses).
# Базовий: групування постів за ID автора
select(Post.author_id, func.count(Post.id)).group_by(Post.author_id)
# SQL: SELECT posts.author_id, count(posts.id) FROM posts GROUP BY posts.author_id

# Просунутий: групування за кількома полями та виразами (екстракція року з дати)
select(
    Post.author_id,
    func.extract("year", Post.created_at),
    func.count(Post.id)
).group_by(
    Post.author_id,
    func.extract("year", Post.created_at)
)
# SQL: SELECT posts.author_id, EXTRACT(year FROM posts.created_at), count(posts.id) FROM posts GROUP BY posts.author_id, EXTRACT(year FROM posts.created_at)
having()
Select
Умови фільтрації для згрупованих результатів (HAVING). Приймає умову фільтрації clause.
# Базовий: фільтрація груп з кількістю елементів більше 5
select(Post.author_id).group_by(Post.author_id).having(func.count(Post.id) > 5)
# SQL: SELECT posts.author_id FROM posts GROUP BY posts.author_id HAVING count(posts.id) > 5

# Просунутий: фільтрація з перевіркою середньої довжини заголовків постів у групі
select(Post.author_id).group_by(Post.author_id).having(
    func.avg(func.length(Post.title)) > 50
)
# SQL: SELECT posts.author_id FROM posts GROUP BY posts.author_id HAVING avg(length(posts.title)) > 50
join()
Select
Об'єднує запит з іншою таблицею або моделлю через INNER JOIN. Приймає цільову модель target та необов'язкову умову об'єднання onclause. Якщо зв'язок налаштовано через relationship(), SQLAlchemy автоматично виведе умову об'єднання.
# Базовий: INNER JOIN через зв'язок (relationship)
select(User).join(User.posts)
# SQL: SELECT users.id, ... FROM users JOIN posts ON users.id = posts.author_id

# Просунутий: INNER JOIN за явною умовою ON та подвійний JOIN
select(User.username, Tag.name).join(
    Post, User.id == Post.author_id
).join(
    Post.tags
)
# SQL: SELECT users.username, tags.name FROM users JOIN posts ON users.id = posts.author_id JOIN post_tags ON posts.id = post_tags.post_id JOIN tags ON tags.id = post_tags.tag_id
outerjoin()
Select
Об'єднує запит з іншою таблицею або моделлю через LEFT OUTER JOIN (LEFT JOIN). Працює аналогічно до .join().
# Базовий: LEFT JOIN користувачів та їхніх постів
select(User, Post).outerjoin(User.posts)
# SQL: SELECT users.id, ... posts.id, ... FROM users LEFT OUTER JOIN posts ON users.id = posts.author_id

# Просунутий: Anti-Join (знайти користувачів, у яких взагалі немає постів)
select(User).outerjoin(User.posts).where(Post.id.is_(None))
# SQL: SELECT users.id, ... FROM users LEFT OUTER JOIN posts ON users.id = posts.author_id WHERE posts.id IS NULL

Оператори та модифікатори стовпців (Column Operators & Modifiers)

Оскільки Python має власні зарезервовані ключові слова (наприклад, in, is, not, and, or), ми не можемо перевантажити їх безпосередньо у виразах на кшталт User.id in [1, 2]. Python-інтерпретатор просто видасть синтаксичну помилку.

Для обходу цього обмеження SQLAlchemy надає спеціальні методи об'єктів стовпців (Column Operators), які генерують відповідні SQL-конструкції:

in\_()
Column Operators
Еквівалент SQL-оператора IN. Перевіряє, чи належить значення стовпця до вказаного списку.
  • Коли використовувати: Для фільтрації за множиною значень (наприклад, статуси, списки ID).
  • Коли НЕ використовувати: З великими списками (тисячі елементів), оскільки це сильно сповільнює розбір запиту базою даних. У таких випадках краще використовувати JOIN або підзапит.
select(User).where(User.id.in_([1, 2, 3]))
# SQL: SELECT ... FROM users WHERE users.id IN (1, 2, 3)
is\_() / is_not()
Column Operators
Еквіваленти SQL-операторів IS та IS NOT. Найчастіше використовуються для роботи з NULL.
  • Коли використовувати: Для фільтрації IS NULL (is_(None)) або IS NOT NULL (is_not(None)).
  • Коли НЕ використовувати: Уникайте порівнянь == None або != None. Хоча SQLAlchemy перевантажує оператори == та != для генерації IS NULL, використання is_() та is_not() є стандартом PEP8 для порівняння з None та робить намір коду очевидним.
select(User).where(User.email.is_(None))
# SQL: SELECT ... FROM users WHERE users.email IS NULL

select(User).where(User.email.is_not(None))
# SQL: SELECT ... FROM users WHERE users.email IS NOT NULL
like() / ilike()
Column Operators
Пошук за шаблоном. like() є регістрозалежним (SQL LIKE), тоді як ilike() — регістронезалежний (SQL ILIKE у PostgreSQL, або LOWER(col) LIKE LOWER(val) в інших СУБД).
  • Коли використовувати: Для пошуку підрядків за допомогою символу маски %.
  • Коли НЕ використовувати: Для великих текстових полів (наприклад, статей) — замість цього краще використовувати повнотекстовий пошук (Full-Text Search) бази даних, оскільки LIKE "%текст%" не використовує стандартні B-Tree індекси та виконує повне сканування таблиці (Full Table Scan).
select(User).where(User.username.like("ara%"))
# SQL: SELECT ... FROM users WHERE users.username LIKE 'ara%'

select(User).where(User.username.ilike("%doe%"))
# SQL: SELECT ... FROM users WHERE users.username ILIKE '%doe%'
startswith() / endswith() / contains()
Column Operators
Спрощені методи-обгортки поверх like() / ilike(), які автоматично додають символ % у потрібні місця.
  • Коли використовувати: Для очевидного та простого текстового пошуку. За замовчуванням вони є регістрозалежними, але приймають параметр autoescape=True або можуть комбінуватися через .ilike() за потреби.
select(User).where(User.username.startswith("ara"))
# SQL: SELECT ... FROM users WHERE users.username LIKE 'ara%'

select(User).where(User.email.endswith(".com"))
# SQL: SELECT ... FROM users WHERE users.email LIKE '%.com'

select(User).where(User.username.contains("admin"))
# SQL: SELECT ... FROM users WHERE users.username LIKE '%admin%'
between()
Column Operators
Еквівалент SQL-оператора BETWEEN ... AND ....
  • Коли використовувати: Для фільтрації діапазонів чисел, дат або цін (включаючи межі).
  • Коли НЕ використовувати: Для діапазонів дат, якщо ви не впевнені щодо часу (оскільки BETWEEN включає кінцеву дату до 00:00:00, що може призвести до пропуску записів за останній день). Для дат часто надійніше використовувати явні порівняння >= та <.
select(User).where(User.id.between(10, 20))
# SQL: SELECT ... FROM users WHERE users.id BETWEEN 10 AND 20
desc() / asc()
Column Modifiers
Визначає напрямок сортування стовпця (спадання / зростання). Зазвичай передається всередину .order_by().
  • Коли використовувати: Для явного контролю порядку сортування у звітах, списках та пагінації.
  • Коли НЕ використовувати: Унікальні індекси вже відсортовані за зростанням за замовчуванням СУБД.
select(User).order_by(User.created_at.desc())
# SQL: SELECT ... FROM users ORDER BY users.created_at DESC
nulls_first() / nulls_last()
Column Modifiers
Визначає, де розташовувати порожні (NULL) значення під час сортування — на початку чи в кінці результату. Працює як доповнення до desc() / asc().
  • Коли використовувати: Коли сортуєте за опціональними полями (наприклад, дата завершення завдання due_date), щоб порожні значення не з'являлися на початку списку.
select(User).order_by(User.email.asc().nulls_last())
# SQL: SELECT ... FROM users ORDER BY users.email ASC NULLS LAST
label()
Column Modifiers
Призначає аліас (псевдонім) стовпцю або виразу (SQL AS).
  • Коли використовувати: При агрегаціях (func.count(), func.sum()) або математичних операціях, щоб результат мав зручне ім'я у NamedTuple-рядках (Row.alias_name).
select(func.count(User.id).label("total")).scalar_one()
# SQL: SELECT count(users.id) AS total FROM users
cast()
Column Modifiers
Приводить значення стовпця до іншого типу даних на рівні бази даних (SQL CAST(col AS type)). Імпортується з головного модуля from sqlalchemy import cast.
  • Коли використовувати: Коли потрібно порівняти стовпець типу UUID з рядком, або перетворити ціле число на рядок для конкатенації чи форматування.
from sqlalchemy import cast, String
select(cast(User.id, String))
# SQL: SELECT CAST(users.id AS VARCHAR) FROM users

Як працює «магія» умов (Перевантаження операторів)

Для розробників, які вперше бачать код User.id > 10 у where(), це виглядає як магія. За логікою Python, вираз User.id > 10 мав би негайно обчислитися в логічне True або False (що передало б у where(True) або where(False) і зламало б запит).

Секрет криється в перевантаженні операторів (Operator Overloading) у Python:

  1. Стовпці — це не просто значення: Атрибут User.id у вашому коді — це не ціле число int. Це об'єкт класу InstrumentedAttribute (дескриптор, наданий SQLAlchemy).
  2. Магічні методи: SQLAlchemy перевизначає стандартні магічні методи порівнянь Python на цих об'єктах:
    • __eq__ (для ==)
    • __ne__ (для !=)
    • __gt__ (для >)
    • __lt__ (для <)
    • тощо.

Коли ви пишете User.id > 10, Python під капотом викликає:

User.id.__gt__(10)

Замість повернення True або False, реалізація __gt__ в SQLAlchemy повертає об'єкт BinaryExpression (бінарний вираз).

Цей об'єкт є вузлом абстрактного синтаксичного дерева (AST) і містить інформацію:

  • Ліва частина: посилання на стовпець User.id
  • Оператор: > (більше)
  • Права частина: константа 10

Коли ви передаєте цей BinaryExpression у метод .where(), SQLAlchemy додає його до списку фільтрів. Під час виконання запиту компилятор діалекту (наприклад, PostgreSQL) обходить це дерево і трансформує вузол у текстовий SQL: users.id > 10 (а точніше, у параметризований вигляд users.id > :id_1).

Зіставлення операторів Python та SQL у SQLAlchemy:

Вираз у SQLAlchemyМагічний метод PythonРезультат у SQL
User.username == "admin"__eq__users.username = 'admin'
User.id != 5__ne__users.id <> 5
User.id > 10__gt__users.id > 10
User.id >= 10__ge__users.id >= 10
User.id < 10__lt__users.id < 10
User.id <= 10__le__users.id <= 10
User.email == None__eq__ з Noneusers.email IS NULL
User.email != None__ne__ з Noneusers.email IS NOT NULL

Використання SQL-функцій: func

У багатьох SQL-запитах виникає потреба використовувати вбудовані функції СУБД: порахувати кількість рядків (COUNT), звести текст до нижнього регістру (LOWER), отримати поточний час (NOW()) або виконати складнішу агрегацію.

У SQLAlchemy для цього призначений спеціальний об'єкт func (імпортується як from sqlalchemy import func).

Як працює «динамічна магія» func?

На відміну від більшості бібліотек, у func немає жорстко прописаного списку методів у коді SQLAlchemy. Замість цього він використовує магічний метод Python __getattr__.

Коли ви звертаєтеся до будь-якого атрибута на func (наприклад, func.any_custom_function()), SQLAlchemy «на льоту» створює об'єкт функції з цим ім'ям. Це означає, що ви можете викликати будь-яку функцію, яку підтримує ваша конкретна база даних, навіть якщо розробники SQLAlchemy про неї не знали!

# Будь-яке ім'я динамічно трансформується в SQL-функцію:
stmt = select(func.my_custom_db_function(User.id))
# SQL: SELECT my_custom_db_function(users.id) FROM users
Найпопулярніші стандартні функції СУБД
func.count()
SQL Function
Агрегатна функція COUNT(). Рахує кількість записів або не-NULL значень у стовпці.
# Рахуємо загальну кількість користувачів
stmt = select(func.count(User.id))
# SQL: SELECT count(users.id) AS count_1 FROM users
func.now() / func.current_timestamp()
SQL Function
Повертає поточну дату та час транзакції. Зазвичай використовується як дефолтне значення для дат створення запису (server_default=func.now()).
# Отримання поточного часу з серверу БД
stmt = select(func.now())
# SQL: SELECT now() AS now_1
func.coalesce()
SQL Function
Еквівалент SQL-функції COALESCE(...). Приймає кілька аргументів і повертає перший з них, який не є NULL.
# Якщо bio порожнє (NULL), повернути рядок за замовчуванням
stmt = select(func.coalesce(User.bio, "Біографія відсутня"))
# SQL: SELECT coalesce(users.bio, 'Біографія відсутня') FROM users
func.lower() / func.upper()
SQL Function
Переведення тексту в нижній або верхній регістр. Часто використовується для валідації або пошуку без урахування регістру (якщо СУБД не підтримує ILIKE).
# Порівняння email у нижньому регістрі
stmt = select(User).where(func.lower(User.email) == "user@example.com")
# SQL: SELECT ... WHERE lower(users.email) = 'user@example.com'
func.concat()
SQL Function
Конкатенація (об'єднання) рядків.
# Об'єднання імені та прізвища через пробіл
stmt = select(func.concat(User.first_name, " ", User.last_name))
# SQL: SELECT concat(users.first_name, ' ', users.last_name) FROM users
Специфічні функції СУБД (на прикладі PostgreSQL)

Оскільки func динамічний, ви можете викликати специфічні для PostgreSQL функції:

# date_trunc — округлення дати до дня (прибирає години/хвилини)
stmt = select(func.date_trunc("day", User.created_at))
# SQL: SELECT date_trunc('day', users.created_at) FROM users

# age — розрахунок віку на основі дати народження
stmt = select(func.age(User.birth_date))
# SQL: SELECT age(users.birth_date) FROM users
Явне зазначення типу повернення (type_)

Зазвичай SQLAlchemy намагається автоматично визначити тип даних, який поверне функція (наприклад, func.count() поверне ціле число Integer).

Проте для специфічних або кастомних СУБД-функцій SQLAlchemy не знає заздалегідь тип повернення. Через це вона не зможе виконати автоматичну конвертацію результату в об'єкти Python (наприклад, перетворити результат date_trunc у Python-об'єкт datetime.date).

Для вирішення цього використовується параметр type_:

from sqlalchemy import Date

# Явно вказуємо, що функція date_trunc повертає дату
stmt = select(func.date_trunc("day", User.created_at, type_=Date))

# Тепер результат буде автоматично розпарсено у datetime.date в Python:
day = session.execute(stmt).scalar_one()  # поверне datetime.date(2026, 7, 4)

Виконання та отримання результатів

Для виконання запиту об'єкт Select передається в session.execute():

result = session.execute(stmt)

Метод execute() повертає об'єкт Result. Це ітератор, який повертає рядки типу Row (схожі на NamedTuple в Python). Оскільки при виборі повних ORM-моделей кожен рядок повертається як кортеж з одним елементом (наприклад, (User,)), SQLAlchemy надає допоміжні методи для розпакування цих даних:

scalars()
ScalarResult
Розпаковує одноелементні кортежі рядків результату, повертаючи чисті об'єкти замість об'єктів Row (кортежів).Чому це потрібно? База даних завжди повертає результати у вигляді таблиці (матриці). Навіть коли ви вибираєте всю модель select(User), SQLAlchemy отримує з бази набір рядків і загортає кожний результат у кортеж з одним елементом:
  • Без .scalars() результат виконання session.execute(select(User)).all() виглядає так: [(User_1,), (User_2,), (User_3,)] — список кортежів довжиною 1. Щоб отримати користувача, довелося б писати row[0].
  • З .scalars() SQLAlchemy автоматично витягує перший елемент із кожного кортежу. Результат виконання session.execute(select(User)).scalars().all() виглядає так: [User_1, User_2, User_3] — чистий список об'єктів.
# Без scalars():
rows = session.execute(select(User)).all()
for row in rows:
    user = row[0]  # доводиться вручну брати перший елемент кортежу
    print(user.username)

# З scalars():
users = session.execute(select(User)).scalars().all()
for user in users:
    print(user.username)  # працюємо безпосередньо з об'єктом User
all()
list[Row] / list[T]
Вичитує всі рядки результату у список і закриває курсор. Якщо викликано після .scalars(), повертає список чистих об'єктів (наприклад, list[User]).
# 1. Без scalars() — список об'єктів Row ( NamedTuple )
rows = session.execute(select(User.id, User.username)).all()
# rows: list[Row(id=1, username="arakviel"), ...]

# 2. Зі scalars() — список чистих об'єктів User
users = session.execute(select(User)).scalars().all()
# users: list[User]
first()
Row / T / None
Повертає перший рядок результату або None, якщо результат порожній. Закриває з'єднання.
# Отримання першого знайденого користувача або None
user = session.execute(
    select(User).order_by(User.created_at.asc())
).scalars().first()
one()
Row / T
Повертає рівно один рядок. Якщо база даних повернула 0 рядків або більше 1, кидає виняток (NoResultFound або MultipleResultsFound).
# Очікуємо отримати рівно одного користувача за унікальним username
stmt = select(User).where(User.username == "admin")
admin = session.execute(stmt).scalars().one()
# Увага: якщо admin немає — кидає NoResultFound
one_or_none()
Row / T / None
Повертає один рядок або None, якщо записів немає. Якщо знайдено більше 1 запису, кидає виняток MultipleResultsFound.
# Безпечний пошук за унікальним email (поверне об'єкт або None)
stmt = select(User).where(User.email == "notfound@example.com")
user = session.execute(stmt).scalars().one_or_none()
scalar_one()
Any
Швидкий аналог .scalars().one(). Повертає значення першого стовпця першого рядка. Корисний для агрегацій типу COUNT.
# Отримання кількості користувачів (повертає одне число int)
count = session.execute(
    select(func.count(User.id))
).scalar_one()
scalar_one_or_none()
Any | None
Швидкий аналог .scalars().one_or_none(). Повертає одне скалярне значення або None.
# Отримання email користувача за ID (поверне рядок або None, якщо ID не існує)
stmt = select(User.email).where(User.id == 999)
email = session.execute(stmt).scalar_one_or_none()

Практичні приклади з виводом результатів

Приклад 1: Отримання списку активних користувачів (ORM-стиль)
stmt = select(User).where(User.is_active == True).order_by(User.username.asc())

# Компіляція SQL:
# SELECT users.id, users.username, users.email, users.is_active, users.created_at
# FROM users
# WHERE users.is_active = true ORDER BY users.username ASC

users = session.execute(stmt).scalars().all()

for user in users:
    print(f"User: {user.username} (ID: {user.id})")

Вивід у консоль:

User: alice (ID: 2)
User: arakviel (ID: 1)
User: bob (ID: 3)
Приклад 2: Вибірка конкретних стовпців (Tuple-стиль)

Коли нам потрібні лише окремі поля, завантажувати повні ORM-об'єкти неефективно (вони займають пам'ять та відстежуються сесією). Краще вибрати лише потрібні стовпці:

stmt = select(User.id, User.username).where(User.id > 1)

# Компіляція SQL:
# SELECT users.id, users.username FROM users WHERE users.id > 1

rows = session.execute(stmt).all()
# rows містить список об'єктів Row, які поводяться як кортежі та іменовані об'єкти

for row in rows:
    # Доступ можливий двома шляхами:
    print(f"ID: {row[0]}, Username: {row[1]}")          # за індексом
    print(f"ID: {row.id}, Username: {row.username}")    # за атрибутом

Вивід у консоль:

ID: 2, Username: alice
ID: 3, Username: bob
Приклад 3: Складні логічні умови (AND, OR, NOT)

Умови AND поєднуються автоматично при передачі кількох аргументів у where(). Для OR та NOT використовуються спеціальні функції or_ та not_:

from sqlalchemy import or_, not_

stmt = (
    select(User)
    .where(
        User.is_active == True,
        or_(
            User.email.like("%@gmail.com"),
            not_(User.username == "admin")
        )
    )
)

# Компіляція SQL:
# SELECT users.id, users.username, users.email ...
# FROM users
# WHERE users.is_active = true AND (users.email LIKE '%@gmail.com' OR users.username != 'admin')
Приклад 4: INNER JOIN та агрегація (Групування)

Знайдемо кількість постів для кожного користувача, що має хоча б один пост, та відсортуємо за спаданням:

from sqlalchemy import func

stmt = (
    select(User.username, func.count(Post.id).label("total_posts"))
    .join(Post, User.id == Post.author_id)
    .group_by(User.username)
    .having(func.count(Post.id) > 0)
    .order_by(func.count(Post.id).desc())
)

# Компіляція SQL:
# SELECT users.username, count(posts.id) AS total_posts
# FROM users JOIN posts ON users.id = posts.author_id
# GROUP BY users.username HAVING count(posts.id) > 0
# ORDER BY count(posts.id) DESC

results = session.execute(stmt).all()
for row in results:
    print(f"{row.username}: {row.total_posts} posts")

Вивід у консоль:

arakviel: 12 posts
alice: 3 posts

Створення даних: insert()

Функція insert() генерує SQL-запити вставки записів (INSERT).

Сигнатура

def insert(table: _DMLTableArgument) -> Insert

Аргументом є ORM-клас моделі або об'єкт Table.

Методи об'єкта Insert

  • **.values(\*args, **kwargs)** — визначає дані для вставки. Може приймати словник, список словників (для масової вставки) або keyword-аргументи.
  • .returning(*cols) — додає секцію RETURNING для отримання згенерованих базою даних значень (наприклад, ID або значень за замовчуванням) в результаті виконання запиту.

ORM vs Core Insertion

У SQLAlchemy є два шляхи створення записів:

ORM-стиль (Instance-based)

Ви створюєте екземпляр класу моделі, додаєте його до сесії через session.add() та робите коміт.

  • Плюси: Об'єкт автоматично стає частиною Unit of Work, відстежуються його зв'язки, після вставки об'єкт готовий до подальшого використання.
  • Мінуси: Повільний для великої кількості записів (створює накладні витрати на рівні Python-об'єктів).

Core-стиль (Bulk DML)

Ви викликаєте insert(Model).values(...) та виконуєте через session.execute().

  • Плюси: Надзвичайно швидкий. Виконує пряму вставку сирих словників даних без створення об'єктів моделей в пам'яті.
  • Мінуси: Не відстежує стан об'єктів у сесії, не синхронізує Identity Map автоматично.

Практичні приклади з виводом результатів

Приклад 1: Одиночна вставка з поверненням згенерованого ID
stmt = (
    insert(User)
    .values(username="new_user", email="new@example.com")
    .returning(User.id, User.created_at)
)

# Компіляція SQL (PostgreSQL):
# INSERT INTO users (username, email) VALUES ('new_user', 'new@example.com')
# RETURNING users.id, users.created_at

result = session.execute(stmt)
row = result.fetchone()  # Оскільки ми чекаємо один рядок з RETURNING

print(f"Inserted ID: {row.id}")
print(f"Created At: {row.created_at}")
session.commit()

Вивід у консоль:

Inserted ID: 42
Created At: 2026-07-04 18:00:00+00:00
Приклад 2: Масова вставка (Bulk Insert)

Для оптимізації продуктивності масової вставки передається список словників:

data = [
    {"username": "user_a", "email": "a@example.com"},
    {"username": "user_b", "email": "b@example.com"},
    {"username": "user_c", "email": "c@example.com"},
]

# Створюємо стейтмент без виклику .values() — передамо дані під час виконання
stmt = insert(User)

# Компіляція SQL:
# INSERT INTO users (username, email) VALUES (:username, :email)

result = session.execute(stmt, data)
# SQLAlchemy автоматично групує це в один оптимізований батч-запит

print(f"Rows inserted: {result.rowcount}")
session.commit()

Вивід у консоль:

Rows inserted: 3

Оновлення даних: update()

Функція update() будує SQL-запити оновлення (UPDATE).

Сигнатура

def update(table: _DMLTableArgument) -> Update

Методи об'єкта Update

  • .where(*clauses) — обмежує вибірку рядків, які будуть оновлені (критично важливо, інакше оновляться всі рядки в таблиці!).
  • .values(**kwargs) або **.values(dict)** — встановлює нові значення для стовпців.
  • .returning(*cols) — повертає оновлені значення.

ORM vs Core Update

  • ORM-стиль: Отримуємо об'єкт через session.get(User, id), міняємо його атрибут (user.email = "new@mail.com"), і робимо session.commit(). SQLAlchemy автоматично згенерує UPDATE під час комміту завдяки Change Tracking (Unit of Work).
  • Core-стиль: Виконуємо update(User).where(...).values(...). Застосовується для масових оновлень або коли потрібно оновити запис без попереднього завантаження його в пам'ять.

Практичні приклади з виводом результатів

Приклад 1: Масове оновлення з фільтрацією

Заблокуємо всіх користувачів, які не заходили на сайт дуже давно (наприклад, у яких пошта на старому домені):

stmt = (
    update(User)
    .where(User.email.like("%@old-domain.com"))
    .values(is_active=False)
)

# Компіляція SQL:
# UPDATE users SET is_active = false WHERE users.email LIKE '%@old-domain.com'

result = session.execute(stmt)
print(f"Matched and updated rows: {result.rowcount}")
session.commit()

Вивід у консоль:

Matched and updated rows: 14
Приклад 2: Динамічне оновлення на основі існуючих значень стовпця

Оновимо лічильник переглядів постів, збільшивши його на 1 directly на рівні бази даних (без завантаження у Python):

stmt = (
    update(Post)
    .where(Post.id == 5)
    .values(views_count=Post.views_count + 1)
    .returning(Post.views_count)
)

# Компіляція SQL:
# UPDATE posts SET views_count = posts.views_count + 1 WHERE posts.id = 5 RETURNING posts.views_count

result = session.execute(stmt)
new_views = result.scalar_one()

print(f"Updated post views count. New value: {new_views}")
session.commit()

Вивід у консоль:

Updated post views count. New value: 101

Видалення даних: delete()

Функція delete() створює SQL-запити видалення записів (DELETE).

Сигнатура

def delete(table: _DMLTableArgument) -> Delete

Методи об'єкта Delete

  • .where(*clauses) — визначає умови фільтрації для видалення (якщо не вказати .where(), база даних очистить усю таблицю!).
  • .returning(*cols) — повертає дані видалених рядків перед їх остаточним знищенням.

Практичні приклади з виводом результатів

Приклад 1: Видалення неактивних користувачів
stmt = (
    delete(User)
    .where(User.is_active == False)
)

# Компіляція SQL:
# DELETE FROM users WHERE users.is_active = false

result = session.execute(stmt)
print(f"Deleted users count: {result.rowcount}")
session.commit()

Вивід у консоль:

Deleted users count: 5
Приклад 2: Видалення з поверненням видалених даних (RETURNING)

Корисно, якщо перед видаленням нам потрібно залогувати або зберегти видалені дані:

stmt = (
    delete(Post)
    .where(Post.created_at < datetime(2020, 1, 1))
    .returning(Post.id, Post.title)
)

# Компіляція SQL:
# DELETE FROM posts WHERE posts.created_at < '2020-01-01 00:00:00'
# RETURNING posts.id, posts.title

deleted_posts = session.execute(stmt).all()

for post in deleted_posts:
    print(f"Logged deleted post: [ID: {post.id}] {post.title}")

session.commit()

Порівняння: LINQ (EF Core) ↔ SQLAlchemy Query API

// Фільтрація + сортування + пагінація
var users = await context.Users
    .Where(u => u.IsActive && u.Username.StartsWith("ara"))
    .OrderByDescending(u => u.CreatedAt)
    .Skip((page - 1) * perPage)
    .Take(perPage)
    .Include(u => u.Posts)
    .ToListAsync();

// Агрегація
var postCount = await context.Posts
    .Where(p => p.AuthorId == userId)
    .CountAsync();

Просунуті операції з даними в Session

Для ефективного написання репозиторіїв та оптимізації роботи з базою даних важливо розрізняти тонкощі поведінки внутрішніх методів Session:

session.get() проти select()

  • session.get(Model, pk_value) шукає об'єкт за його первинним ключем. Його головна перевага — використання Identity Map. Перед тим як генерувати SQL-запит і відправляти його в базу даних, SQLAlchemy перевіряє, чи цей об'єкт уже завантажений у поточну сесію. Якщо так — він одразу повертає об'єкт із пам'яті Python, повністю уникаючи мережевого запиту до БД.
  • Конструкція select(Model).where(Model.id == pk_value) завжди генерує та виконує SELECT-запит до бази даних, ігноруючи те, що об'єкт уже може бути завантажений.
  • Рекомендація: Для пошуку за ID завжди віддавайте перевагу session.get() — це суттєво заощаджує ресурси бази даних.
Схематичне порівняння методів Session:
МетодОб'єкт до викликуСтан після викликуЧи робить запит до БД?
session.add(obj)Transient (новий)Persistent (відстежується)Ні (запит буде при flush/commit)
session.get(Model, id)Не існує у сесіїPersistent (завантажений)Лише якщо об'єкта немає в Identity Map
session.merge(obj)Detached (ззовні)Detached (оригінал), повернутий копіюється у PersistentТак, робить SELECT для перевірки стану в БД
session.delete(obj)PersistentPending Delete (видалення при commit)Ні (запит буде при flush/commit)
session.flush()Persistent/DirtyPersistent (зміни відправлені до БД)Так, виконує накопичені операції запису
session.commit()Persistent/DirtyPersistent (або Expired / Detached при закритті)Так, фіксує поточну транзакцію
session.refresh(obj)PersistentPersistent (дані оновлено з БД)Так, виконує SELECT для оновлення полів

Під капотом: Unit of Work, Identity Map та Change Tracking

Щоб дійсно розуміти SQLAlchemy і передбачати її поведінку, недостатньо просто знати синтаксис. Потрібно зрозуміти три ключові патерни, що лежать в основі Session: Unit of Work, Identity Map та Change Tracking.

Патерн Unit of Work (Одиниця роботи)

Unit of Work — це архітектурний патерн (описаний Мартіном Фаулером у книзі «Patterns of Enterprise Application Architecture»), який відстежує всі зміни, що відбуваються з об'єктами під час однієї «одиниці роботи» (зазвичай — одного HTTP-запиту), і потім атомарно відправляє їх до бази даних у вигляді однієї транзакції.

Аналогія: уявіть, що Session — це кошик для покупок. Ви кладете товари (об'єкти) у кошик, берете звідти, змінюєте вміст — і лише коли ви підходите до каси (session.commit()), всі операції виконуються разом як єдина атомарна дія.

Loading diagram...
sequenceDiagram
    participant Code as "Python Code"
    participant Session as "Session (UoW)"
    participant DB as "PostgreSQL"

    Code->>Session: session.add(new_user)
    Note over Session: Додає об'єкт у стан "pending"
    Code->>Session: user.is_active = False
    Note over Session: Відмічає об'єкт як "dirty"
    Code->>Session: session.delete(old_post)
    Note over Session: Позначає об'єкт як "deleted"
    Code->>Session: await session.commit()
    Session->>DB: BEGIN TRANSACTION
    Session->>DB: INSERT INTO users VALUES (...)
    Session->>DB: UPDATE users SET is_active=false WHERE id=...
    Session->>DB: DELETE FROM posts WHERE id=...
    Session->>DB: COMMIT
    Note over Session: Об'єкти стають "persistent"

Session відстежує об'єкти через чотири можливі стани:

  • Transient (перехідний) — об'єкт створений (User()), але ще не доданий до сесії.
  • Pending (очікуваний) — об'єкт доданий через session.add(), але ще не збережений до БД (INSERT ще не виконано).
  • Persistent (постійний) — об'єкт має відповідний запис у БД та відстежується сесією.
  • Detached (від'єднаний) — об'єкт був persistent, але сесія закрита або він був видалений зі сесії.

Identity Map: Гарантія унікальності

Identity Map (карта ідентичностей) — це внутрішній словник у Session, що відображає первинний ключ бази даних на Python-об'єкт.

Ключова гарантія Identity Map: у межах однієї сесії session.get(User, 1) завжди повертає той самий Python-об'єкт, незалежно від кількості викликів.

identity_map_demo.py
with Session(engine) as session:
    # Перший виклик: SELECT * FROM users WHERE id = 1
    user_a = session.get(User, 1)

    # Другий виклик: SQL-запиту НЕ виконується!
    # SQLAlchemy знаходить об'єкт у Identity Map і повертає той самий
    user_b = session.get(User, 1)

    print(user_a is user_b)  # True — це буквально один і той самий об'єкт!

    # Якщо ми змінимо user_a — user_b теж «зміниться»
    user_a.is_active = False
    print(user_b.is_active)  # False

Це принципово відрізняється від простого кешування запитів. Identity Map гарантує, що ваша програма завжди працює з консистентним станом об'єктів протягом однієї транзакції.

Порівняння з EF Core: DbContext.ChangeTracker є прямим аналогом Identity Map + Change Tracking у SQLAlchemy. Якщо ви двічі запитуєте один і той самий запис через context.Users.Find(1) в EF Core — ви також отримуєте той самий об'єкт з кешу.

Change Tracking: Як SQLAlchemy «знає», що змінилося

Change Tracking (відстеження змін) — це механізм, завдяки якому Session автоматично знає, які саме атрибути об'єктів були змінені і генерує мінімально необхідний UPDATE-запит.

Цей механізм реалізований через Python-дескриптори (instrumented attributes) — при реєстрації моделі SQLAlchemy замінює звичайні атрибути класу (username, email тощо) на спеціальні об'єкти-дескриптори, що перехоплюють операції __get__ і __set__.

change_tracking_demo.py
from sqlalchemy import inspect

with Session(engine) as session:
    user = session.get(User, 1)  # Завантажуємо об'єкт

    # Змінюємо кілька атрибутів:
    user.username = "new_username"
    user.bio = "Updated bio"

    # Можна перевірити стан об'єкта ПЕРЕД commit():
    inspector = inspect(user)
    for attr in inspector.attrs:
        history = attr.history
        if history.has_changes():
            print(f"  {attr.key}: {history.deleted}{history.added}")
    # Виведе:
    # username: ['old_username'] → ['new_username']
    # bio: [None] → ['Updated bio']

    session.commit()
    # SQLAlchemy генерує МІНІМАЛЬНИЙ запит:
    # UPDATE users SET username='new_username', bio='Updated bio' WHERE id=1
    # (лише змінені поля, не всі!)

Саме завдяки Change Tracking SQLAlchemy генерує ефективні UPDATE-запити — оновлюються лише ті стовпці, що дійсно змінилися, а не всі поля об'єкта.


Інтеграція з FastAPI: get_async_session через Dependency Injection

Тепер об'єднаємо все, що ми вивчили, у повноцінний production-ready файл конфігурації бази даних для FastAPI-додатку. Ключовим питанням є: як правильно управляти життєвим циклом AsyncSession — коли її створювати і коли закривати?

Відповідь стандартна для FastAPI: через Dependency Injection (детально розібраний у статті 20).

core/database.py
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)
from sqlalchemy.orm import DeclarativeBase

# 1. Базовий клас для всіх моделей
class Base(DeclarativeBase):
    pass

# 2. Async Engine — singleton, живе весь час роботи додатку
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/taskforge_db"

async_engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_recycle=1800,
    echo=False,
)

# 3. Фабрика AsyncSession
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    autocommit=False,
    autoflush=False,
    expire_on_commit=False,
)

# 4. Dependency для FastAPI — генератор сесії
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    """
    FastAPI Dependency: надає AsyncSession для одного HTTP-запиту.
    Сесія автоматично закривається після завершення запиту.
    У разі помилки — автоматичний rollback.
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Тепер використаємо цей dependency у FastAPI-роутері:

routers/users.py
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_async_session
from app.models.user import User
# Схеми Pydantic для користувачів
from app.schemas.user import UserCreate, UserRead

router = APIRouter(prefix="/users", tags=["Users"])

# Зручний alias для типізованого параметра залежності
SessionDep = Annotated[AsyncSession, Depends(get_async_session)]

@router.get("/{user_id}", response_model=UserRead)
async def get_user(user_id: int, session: SessionDep):
    """Отримати користувача за ID."""
    user = await session.get(User, user_id)
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id={user_id} not found"
        )
    return user

@router.get("/", response_model=list[UserRead])
async def list_users(
    session: SessionDep,
    skip: int = 0,
    limit: int = 20,
):
    """Отримати список користувачів з пагінацією."""
    users = (await session.execute(
        select(User)
        .offset(skip)
        .limit(limit)
        .order_by(User.created_at.desc())
    )).scalars().all()
    return users

@router.post("/", response_model=UserRead, status_code=status.HTTP_201_CREATED)
async def create_user(user_in: UserCreate, session: SessionDep):
    """Створити нового користувача."""
    # Перевірка унікальності username
    existing = (await session.execute(
        select(User).where(User.username == user_in.username)
    )).scalar_one_or_none()

    if existing:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail=f"Username '{user_in.username}' already taken"
        )

    user = User(**user_in.model_dump())
    session.add(user)
    await session.flush()  # Виконуємо INSERT, щоб отримати згенерований id
    return user
Зверніть на await session.flush() перед поверненням user. Виклик flush() відправляє INSERT до PostgreSQL у межах поточної транзакції (але не комітить її), що дозволяє отримати згенерований автоінкрементний id. Сам commit() відбудеться автоматично у dependency-генераторі get_async_session() при успішному завершенні запиту.

Ініціалізація таблиць при старті додатку

Для розробки та тестування зручно автоматично створювати всі таблиці при старті FastAPI:

main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.core.database import async_engine, Base

# Імпортуємо всі моделі, щоб Base.metadata їх «знала»
from app.models import user, post, project  # noqa: F401

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifecycle-хук FastAPI: виконується при старті та зупинці."""
    # Старт: створюємо всі таблиці (лише для розробки!)
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # Зупинка: закриваємо пул підключень
    await async_engine.dispose()

app = FastAPI(title="TaskForge API", lifespan=lifespan)
Base.metadata.create_all() — зручний інструмент для локальної розробки та тестів, але він категорично не підходить для production. Він лише створює таблиці, що не існують, але не вміє змінювати існуючі (додавати стовпці, змінювати типи, видаляти поля). Для управління схемою бази даних у production використовується Alembic — інструмент міграцій, якому присвячена наступна стаття 23.

Практичний приклад від А до Я: Самодостатній скрипт

Для того щоб ви могли швидко запустити та протестувати всі концепції async SQLAlchemy 2.0 в одному місці без потреби встановлювати PostgreSQL чи конфігурувати FastAPI, нижче наведено повністю самодостатній, працездатний асинхронний скрипт.

Він використовує вбудовану в Python базу даних SQLite в оперативній пам'яті (:memory:), створює таблиці, виконує CRUD-операції, оптимізує завантаження зв'язків та виводить результати у консоль.

Для запуску вам знадобляться встановлені бібліотеки sqlalchemy та асинхронний драйвер aiosqlite:

pip install sqlalchemy aiosqlite
main.py
import asyncio
from datetime import datetime
from sqlalchemy import String, ForeignKey, select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, selectinload

# 1. Створюємо базовий клас
class Base(DeclarativeBase):
    pass

# 2. Визначаємо моделі (зв'язок Один до Багатьох: User -> Post)
class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)

    posts: Mapped[list["Post"]] = relationship(
        back_populates="author", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<User id={self.id} username={self.username!r}>"

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(String)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    author: Mapped["User"] = relationship(back_populates="posts")

    def __repr__(self) -> str:
        return f"<Post id={self.id} title={self.title!r}>"

# 3. Налаштовуємо Async Engine та Session local фабрику
# Використовуємо SQLite в пам'яті з асинхронним драйвером aiosqlite
DATABASE_URL = "sqlite+aiosqlite:///:memory:"

async_engine = create_async_engine(DATABASE_URL, echo=True)  # echo=True покаже згенерований SQL
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine, class_=AsyncSession, expire_on_commit=False
)

async def main():
    # 4. Створюємо таблиці у базі даних
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSessionLocal() as session:
        print("\n--- 1. Створення записів (INSERT) ---")
        # Створюємо користувача та його пости
        user = User(
            username="arakviel",
            email="arakviel@example.com",
            posts=[
                Post(title="Перший async пост", content="Вміст першого асинхронного посту"),
                Post(title="Другий async пост", content="Вміст другого асинхронного посту")
            ]
        )
        session.add(user)
        await session.commit()
        print(f"Збережено користувача: {user} (ID: {user.id})")

        print("\n--- 2. Читання записів з Eager Loading (SELECT з selectinload) ---")
        # selectinload(User.posts) запобігає N+1 проблемі для завантаження списку постів
        stmt = select(User).where(User.username == "arakviel").options(selectinload(User.posts))
        db_user = (await session.execute(stmt)).scalars().one()

        print(f"Знайдено користувача: {db_user.username}")
        for post in db_user.posts:
            print(f" - Пост: {post.title}")

        print("\n--- 3. Оновлення запису (UPDATE) ---")
        # Оновимо заголовок першого посту
        first_post = db_user.posts[0]
        first_post.title = "Оновлена назва async посту"
        await session.commit()
        print(f"Нова назва посту в БД: {first_post.title}")

        print("\n--- 4. Видалення запису (DELETE) ---")
        # Видалення користувача призведе до каскадного видалення всіх його постів
        await session.delete(db_user)
        await session.commit()

        # Перевіряємо, чи залишились пости у базі
        posts_stmt = select(Post)
        remaining_posts = (await session.execute(posts_stmt)).scalars().all()
        print(f"Пости в базі даних після видалення користувача: {remaining_posts}")

    # Закриваємо з'єднання з БД
    await async_engine.dispose()

if __name__ == "__main__":
    asyncio.run(main())
python main.py
$ python main.py
--- 1. Створення записів (INSERT) ---
INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:INSERT INTO users (username, email) VALUES (?, ?)
INFO:sqlalchemy.engine.Engine:INSERT INTO posts (title, content, created_at, author_id) VALUES (?, ?, ?, ?)
INFO:sqlalchemy.engine.Engine:COMMIT
Збережено користувача: <User id=1 username='arakviel'> (ID: 1)
--- 2. Читання записів з Eager Loading ---
INFO:sqlalchemy.engine.Engine:SELECT users.id, users.username, users.email FROM users WHERE users.username = ?
INFO:sqlalchemy.engine.Engine:SELECT posts.author_id AS posts_author_id, posts.id AS posts_id, posts.title AS posts_title ... FROM posts WHERE posts.author_id IN (?)
Знайдено користувача: arakviel
- Пост: Перший async пост
- Пост: Другий async пост
--- 3. Оновлення запису (UPDATE) ---
INFO:sqlalchemy.engine.Engine:UPDATE posts SET title=? WHERE posts.id = ?
INFO:sqlalchemy.engine.Engine:COMMIT
Нова назва посту в БД: Оновлена назва async посту
--- 4. Видалення запису (DELETE) ---
INFO:sqlalchemy.engine.Engine:DELETE FROM posts WHERE posts.id = ?
INFO:sqlalchemy.engine.Engine:DELETE FROM users WHERE users.id = ?
INFO:sqlalchemy.engine.Engine:COMMIT
Пости в базі даних після видалення користувача: []

Практика: Будуємо шар моделей для TaskForge

На основі всього вивченого побудуємо повноцінний шар моделей для проєкту TaskForge. Усі моделі розміщуватимуться у директорії models/ і успадковуватимуться від спільного Base.

Структура проєкту

Крок 4: Фіксація змін у репозиторії

Збережіть усі внесені зміни та виконайте фіксацію у вашому сховищі Git:

terminal
git add .
git commit -m "feat: add PostgreSQL with SQLAlchemy 2.0 async models"

Практичні вправи

Виконайте завдання для перевірки та закріплення матеріалу статті.

Вправа 1: Отримання об'єкта та перевірка на None (Базовий рівень)

Завдання: Напишіть асинхронний роут GET /api/v1/projects/{project_id}, який:

  • Приймає project_id: int та session: SessionDep.
  • Використовує асинхронний метод session.get(Project, project_id).
  • Якщо проєкт не знайдено в базі даних (метод повернув None), викидає HTTPException(status_code=404, detail="Project not found").
  • Якщо проєкт знайдено, повертає його.

Вправа 2: Каскадне створення сутностей в одній транзакції (Середній рівень)

Завдання: Напишіть асинхронний роут POST /api/v1/projects, який створює новий проєкт та автоматично робить його творця учасником проєкту:

  • Приймає project_in: ProjectCreate та session: SessionDep.
  • Припускається, що owner_id дорівнює 1 (для спрощення, поки немає аутентифікації).
  • Створює об'єкт Project та додає його до сесії.
  • Створює об'єкт ProjectMember із роллю MemberRole.OWNER для цього проєкту та користувача з id=1.
  • Зберігає обидві сутності в базу даних за допомогою await session.commit().

Вправа 3: Агрегація та оптимізація N+1 запитів (Професійний рівень)

Завдання: Напишіть асинхронний запит для отримання списку користувачів разом із кількістю їхніх завдань:

  • Напишіть запит select(User, func.count(Task.id)) із використанням outerjoin(User.tasks) та group_by(User.id).
  • Виконайте запит у сесії та отримайте результати.
  • Переконайтеся за допомогою логів (echo=True), що виконується лише один SQL-запит до бази даних.

Підсумок

У цій статті ми пройшли від перших принципів до побудови повноцінного шару моделей для FastAPI-додатку. Виокремимо ключові ідеї:

Дворівнева архітектура

SQLAlchemy — це екосистема з двох рівнів: Core (SQL-будівник) і ORM (об'єктне відображення). У версії 2.0 вони об'єднані в єдиний select()-based API.

Новий Python-first синтаксис

DeclarativeBase, Mapped[T] і mapped_column() дають повну type-safety. Mapped[int] — NOT NULL, Mapped[int | None] — NULLABLE. IDE тепер розуміє типи атрибутів моделей.

Engine та Session

Engine управляє пулом підключень (singleton). Session / AsyncSession — короткоживучий «workspace» для роботи з об'єктами. expire_on_commit=False — обов'язково для async.

N+1 проблема та Eager Loading

Lazy Loading у async не підтримується. Завжди явно вказуйте selectinload() для колекцій і joinedload() для одиничних зв'язків. Увімкніть echo=True для аудиту запитів.

Unit of Work під капотом

Session відстежує об'єкти (Transient → Pending → Persistent → Detached), гарантує унікальність через Identity Map і генерує мінімальні UPDATE-запити через Change Tracking.

Інтеграція з FastAPI

get_async_session() — dependency-генератор: один HTTP-запит = одна AsyncSession. Автоматичний commit() при успіху та rollback() при помилці. create_all() — лише для dev/тестів.

Наступна стаття 23: Alembic — міграції бази даних розкриє тему управління схемою БД у production: автогенерацію міграцій, їх застосування та відкат. Alembic є обов'язковим інструментом для будь-якого серйозного проєкту, що використовує SQLAlchemy.
Copyright © 2026