SQLAlchemy 2.0 — ORM, Core та Async Engine
Жоден production-рівень API не живе у вакуумі. Будь-який реальний вебсервіс неминуче стикається з необхідністю зберігати, читати, оновлювати та видаляти дані у постійному сховищі — реляційній базі даних. Саме тут на сцену виходить SQLAlchemy — найповніший, найзріліший і найпопулярніший інструментарій для роботи з базами даних у Python, який часто називають «швейцарським ножем» між ORM та SQL-будівником.
Проте SQLAlchemy — це не просто ще один ORM. Це ціла екосистема, що складається з двох чітко розмежованих рівнів: Core (низькорівневий будівник SQL-запитів) і ORM (об'єктно-реляційне відображення). Знання обох рівнів і розуміння того, де закінчується один і починається інший, є ознакою зрілого Python-розробника.
У версії 2.0, яка вийшла у лютому 2023 року після тривалого перехідного періоду, SQLAlchemy отримала принципово новий, сучасний Python-синтаксис із підтримкою повної статичної типізації. Якщо ви бачили старий стиль SQLAlchemy 1.x — із класичними декларативними моделями на базі Column() та рядковими рефернесами — забудьте про нього. SQLAlchemy 2.0 виглядає і відчувається зовсім по-іншому. Власне, саме для цього і написана ця стаття.
AsyncSession. Знання type hints і Pydantic (стаття 15) допоможе розібратись у новому синтаксисі Mapped[T] та mapped_column().«Початок з далека»: Навіщо взагалі потрібен ORM?
Уявіть, що ви пишете FastAPI-ендпоінт, який має отримати список проектів конкретного користувача з бази даних PostgreSQL. Без жодного ORM або допоміжного інструменту вам доведеться:
- Відкрити низькорівневе підключення до бази даних через Python-драйвер (
psycopg2абоasyncpg). - Сформувати SQL-рядок вручну — потенційно вразливий до SQL-ін'єкцій.
- Виконати запит і отримати сирий результат у вигляді списку кортежів або словників.
- Вручну перетворити ці дані у Python-об'єкти (або Pydantic-моделі).
- При оновленні — стежити за тим, які поля змінилися, і самостійно генерувати
UPDATE-запит. - Явно керувати транзакціями:
BEGIN,COMMIT,ROLLBACK.
Це величезний обсяг шаблонного коду (boilerplate), схильного до помилок і складного у тестуванні. ORM (Object-Relational Mapper) вирішує цю проблему, надаючи абстракцію між реляційним світом таблиць і рядків та об'єктно-орієнтованим світом Python-класів і екземплярів.
Без ORM (Raw SQL)
- Ручне формування SQL-рядків
- Ризик SQL-ін'єкцій без параметризації
- Сирі дані у вигляді кортежів
(42, 'arakviel', 'active') - Ручне перетворення у Python-об'єкти
- Самостійне управління транзакціями
- Код прив'язаний до конкретного діалекту SQL
З SQLAlchemy ORM
- Моделі як звичайні Python-класи (
class User(Base): ...) - Безпечні параметризовані запити «з коробки»
- Результати як типізовані об'єкти
user.username - Автоматичне відстеження змін (Change Tracking)
- Управління транзакціями через контекстний менеджер
- Підтримка різних БД (PostgreSQL, MySQL, SQLite) без зміни коду
Архітектура SQLAlchemy: Два рівні абстракції
Однією з найважливіших речей, яку потрібно зрозуміти про SQLAlchemy ще до написання першого рядка коду, є її дворівнева архітектура. Ця концепція корінним чином відрізняє SQLAlchemy від більшості інших ORM (як, наприклад, Django ORM або Peewee), які надають лише один рівень — об'єктний.
Розглянемо кожен рівень детально.
SQLAlchemy Core — «Конструктор SQL»
Core — це нижній рівень SQLAlchemy. Він надає Python-API для побудови SQL-виразів як об'єктів мови Python, а не як рядків. Цей рівень є «мовонезалежним» у розумінні бази даних: ви будуєте вираз select(users_table).where(users_table.c.id == 42), а SQLAlchemy сама генерує правильний SQL-діалект для PostgreSQL, MySQL, SQLite чи Oracle.
Ключові компоненти Core:
Engine— «серце» SQLAlchemy, яке управляє пулом підключень до бази даних.MetaDataтаTable— опис структури таблиць як Python-об'єктів.- DML-функції —
select(),insert(),update(),delete()для побудови запитів. Connection— об'єкт одного активного підключення до БД.
SQLAlchemy ORM — «Об'єктне відображення»
ORM — це верхній рівень, побудований поверх Core. Він надає класичне об'єктно-реляційне відображення: ви працюєте з Python-класами (моделями), а SQLAlchemy автоматично перетворює операції над ними у відповідні SQL-запити.
Ключові компоненти ORM:
DeclarativeBaseтаMapped— оголошення моделей (таблиць) як Python-класів.SessionтаAsyncSession— «workspace» для роботи з об'єктами, реалізація патерну Unit of Work.relationship()— декларативний опис зв'язків між таблицями (one-to-many, many-to-many тощо).- Identity Map — внутрішній реєстр, що гарантує унікальність об'єктів у межах однієї сесії.
select(), insert() тощо), а не через застарілий session.query() API. Тобто ORM тепер «говорить» мовою Core, що робить код більш уніфікованим і передбачуваним.Порівняння: EF Core ↔ SQLAlchemy
Перед тим як занурюватися в код, варто провести паралелі з інструментарієм ASP.NET, щоб сформувати правильні ментальні моделі. Якщо ви вже працювали з Entity Framework Core, більшість концепцій SQLAlchemy одразу стануть інтуїтивно зрозумілими — назви відрізняються, але ідеї залишаються тими самими.
| Концепція | Entity Framework Core (ASP.NET) | SQLAlchemy 2.0 (Python) |
|---|---|---|
| Реєстрація «бази об'єктів» | DbContext | Session / AsyncSession |
| Набір записів таблиці | DbSet<TEntity> | Mapped[] + Table |
| Підключення до БД | DbContext.Database | Engine / AsyncEngine |
| ORM-модель (сутність) | class User : BaseEntity | class User(Base): ... |
| Відстеження змін | DbContext.ChangeTracker | Session (Unit of Work) |
| Тільки читання | AsNoTracking() | execution_options(populate_existing=True) або select() без session.add() |
| Міграції | dotnet ef migrations add | alembic revision --autogenerate |
| Eager Loading | .Include(u => u.Posts) | selectinload(User.posts) |
| Lazy Loading | Автоматично (за замовчуванням) | Вимкнено за замовчуванням (вимагає явного налаштування) |
| LINQ-запити | context.Users.Where(u => u.IsActive) | select(User).where(User.is_active == True) |
Найважливіша відмінність: в EF Core DbContext є одночасно і «сховищем об'єктів», і точкою доступу до запитів (через DbSet). У SQLAlchemy ці ролі чітко розділені: Engine управляє підключеннями, а Session управляє об'єктами. Це розділення є більш явним і дозволяє краще контролювати життєвий цикл обох сутностей.
Engine та Connection Pool: Фундамент SQLAlchemy
Engine — це перший об'єкт, який потрібно створити для роботи з SQLAlchemy. Він є центральною точкою конфігурації та управляє пулом підключень (connection pool) до бази даних.
Чому потрібен Connection Pool?
Відкриття нового підключення до PostgreSQL — це відносно «дорога» операція: вона включає TCP-рукостискання, автентифікацію та ініціалізацію внутрішнього стану сесії PostgreSQL. У вебзастосунку, де одночасно можуть надходити сотні або тисячі запитів, відкривати і закривати з'єднання для кожного запиту є неприпустимо повільним підходом.
Connection Pool вирішує цю проблему: він заздалегідь відкриває кілька підключень і тримає їх «живими». Коли FastAPI-обробнику потрібне підключення до бази даних, він бере готове з пулу, використовує його, і повертає назад — підключення не закривається, а очікує наступного запиту.
Python DB-API 2.0: Що таке «драйвер» і як він працює?
Перш ніж розбирати create_engine(), важливо зрозуміти рівень, що знаходиться нижче SQLAlchemy, — безпосередній Python-драйвер для PostgreSQL.
DB-API 2.0 (PEP 249) — це стандартний Python-інтерфейс для роботи з реляційними базами даних. Він визначає мінімальний набір функцій і об'єктів, які має реалізувати будь-який Python-пакет для роботи з конкретною СУБД. Завдяки цьому стандарту SQLAlchemy може прозоро перемикатися між різними драйверами без зміни вашого коду.
Аналогія: DB-API 2.0 — це «стандартна розетка», а конкретні драйвери (psycopg2, asyncpg, psycopg) — це «прилади», що підключаються до неї. SQLAlchemy — це «подовжувач», який надає вам зручний API поверх будь-якого «приладу».
Як виглядає робота з драйвером напряму?
Щоб по-справжньому оцінити, від чого абстрагує SQLAlchemy, подивимося на «голий» DB-API 2.0 з psycopg2:
import psycopg2
from psycopg2.extras import RealDictCursor
# 1. Відкриваємо підключення вручну
conn = psycopg2.connect(
host="localhost",
port=5432,
dbname="taskforge_db",
user="user",
password="password",
)
try:
# 2. Створюємо курсор (об'єкт для виконання запитів)
# RealDictCursor → результати у вигляді dict замість tuple
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# 3. Виконуємо параметризований запит (захист від SQL-ін'єкцій)
cur.execute(
"SELECT id, username, email FROM users WHERE is_active = %s LIMIT %s",
(True, 10), # Параметри — окремим кортежем, НЕ через f-string!
)
# 4. Отримуємо результати
rows = cur.fetchall()
for row in rows:
print(row["id"], row["username"]) # row — це dict
# 5. INSERT з поверненням згенерованого id
cur.execute(
"INSERT INTO users (username, email) VALUES (%s, %s) RETURNING id",
("new_user", "new@example.com"),
)
new_id = cur.fetchone()["id"]
# 6. Підтверджуємо транзакцію вручну
conn.commit()
except Exception:
conn.rollback() # Відкат при будь-якій помилці
raise
finally:
conn.close() # Завжди закриваємо підключення
Цей код працює, але очевидно, наскільки він багатослівний: ручне управління з'єднаннями, курсорами, транзакціями та перетворенням результатів — все це ваша відповідальність. SQLAlchemy автоматизує весь цей boilerplate.
Асинхронний аналог із asyncpg виглядає інакше (asyncpg не реалізує DB-API 2.0, а має власний API, оптимізований для asyncio):
import asyncpg
async def main():
# asyncpg підключається напряму без курсорів
conn = await asyncpg.connect(
host="localhost", port=5432,
database="taskforge_db", user="user", password="password",
)
try:
# asyncpg повертає список Record-об'єктів (схожі на dict)
rows = await conn.fetch(
"SELECT id, username FROM users WHERE is_active = $1 LIMIT $2",
True, 10, # Параметри через $1, $2 (PostgreSQL-стиль)
)
for row in rows:
print(row["id"], row["username"])
new_id = await conn.fetchval(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
"new_user", "new@example.com",
)
finally:
await conn.close()
psycopg2 використовує %s (стиль Python printf), тоді як asyncpg використовує $1, $2, $3 (нативний PostgreSQL-стиль). SQLAlchemy абстрагує цю різницю — ви завжди пишете однаковий Python-код, незалежно від драйвера.Які драйвери обирати у 2026+ році?
На сьогодні для PostgreSQL існує три основних варіанти драйверів. Ось актуальна картина:
| Драйвер | Тип | Статус | Коли використовувати |
|---|---|---|---|
| psycopg2 | Синхронний | Стабільний, але legacy | Старі проєкти, де вже використовується. Нові проєкти — краще уникати. |
| psycopg (v3) | Sync + Async | ✅ Актуальний | Рекомендований вибір для синхронних проєктів. Прямий наступник psycopg2. |
| asyncpg | Асинхронний | ✅ Актуальний | Рекомендований вибір для async FastAPI. Найшвидший async-драйвер. |
asyncpg
Рекомендований для async FastAPI.
Написаний на Cython, використовує бінарний PostgreSQL-протокол. За бенчмарками в 3–5 разів швидший за psycopg2. Не реалізує DB-API 2.0, зате надає зручний нативний async API. SQLAlchemy підтримує через postgresql+asyncpg://.
pip install asyncpg
psycopg (v3)
Рекомендований для синхронних проєктів або міграції з psycopg2.
Підтримує як sync, так і async режими в одному пакеті. Реалізує оновлений DB-API 2.0. Значно продуктивніший за psycopg2. SQLAlchemy підтримує через postgresql+psycopg:// (без цифри 2).
pip install "psycopg[binary]"
pip install sqlalchemy asyncpg
postgresql+asyncpg://user:password@localhost/dbnameЯкі бази даних підтримує SQLAlchemy «з коробки»?
SQLAlchemy не прив'язана до PostgreSQL. Офіційна документація визначає п'ять вбудованих (included) діалектів — тобто БД, для яких підтримка вбудована безпосередньо в пакет sqlalchemy без встановлення сторонніх розширень.
func.now() у PostgreSQL стає NOW(), а у MySQL — NOW() чи SYSDATE() залежно від версії. Ваш Python-код залишається незмінним.| СУБД | SQLAlchemy DSN-префікс | Sync-драйвери | Async-драйвери |
|---|---|---|---|
| PostgreSQL | postgresql+<driver>:// | psycopg2, psycopg | asyncpg, psycopg (async) |
| MySQL / MariaDB | mysql+<driver>:// | mysqlclient, PyMySQL | asyncmy, aiomysql |
| SQLite | sqlite+<driver>:/// | вбудований sqlite3 | aiosqlite |
| Oracle Database | oracle+<driver>:// | cx_Oracle, oracledb | oracledb (async) |
| Microsoft SQL Server | mssql+<driver>:// | pyodbc, pymssql | aioodbc |
Розглянемо кожну БД детальніше.
MySQL та MariaDB
MySQL та його повністю сумісний форк MariaDB є найпоширенішими реляційними СУБД у веброзробці. SQLAlchemy підтримує обидві через однаковий mysql+<driver>:// префікс (MariaDB автоматично визначається за версією сервера).
# mysqlclient: найшвидший синхронний драйвер для MySQL (C-розширення)
# pip install mysqlclient
engine = create_engine(
"mysql+mysqldb://user:password@localhost:3306/mydb?charset=utf8mb4"
)
# asyncmy: сучасний async-драйвер для MySQL/MariaDB
# pip install asyncmy
async_engine = create_async_engine(
"mysql+asyncmy://user:password@localhost:3306/mydb?charset=utf8mb4"
)
# PyMySQL: pure-Python драйвер, простий у встановленні (без компіляції)
# pip install pymysql
engine = create_engine(
"mysql+pymysql://user:password@localhost:3306/mydb?charset=utf8mb4"
)
- За замовчуванням рушій InnoDB у MySQL підтримує транзакції та зовнішні ключі. Старий
MyISAM— не підтримує. SQLAlchemy очікує InnoDB. BOOLEANу MySQL зберігається якTINYINT(1)(0/1), а не справжній булевий тип.AUTO_INCREMENTзамість PostgreSQLSERIAL/GENERATED ALWAYS AS IDENTITY— SQLAlchemy абстрагує це автоматично черезprimary_key=True.
SQLite
SQLite — вбудована у Python (import sqlite3) легковажна СУБД без сервера: вся база даних зберігається в одному .db-файлі на диску. Ідеально підходить для локальної розробки, тестів та невеликих проєктів.
# Не потрібен додатковий пакет — sqlite3 вбудований у Python
engine = create_engine(
"sqlite:///./taskforge_dev.db" # відносний шлях до файлу
)
# In-memory БД (існує лише у RAM, зникає при закритті):
engine_memory = create_engine("sqlite:///:memory:")
# aiosqlite: async-обгортка над sqlite3
# pip install aiosqlite
async_engine = create_async_engine(
"sqlite+aiosqlite:///./taskforge_dev.db"
)
# Async in-memory:
async_engine_memory = create_async_engine("sqlite+aiosqlite:///:memory:")
DATABASE_URL через змінну середовища в conftest.py. In-memory SQLite (sqlite:///:memory:) робить тести максимально ізольованими — кожен тест починає з чистої бази.Microsoft SQL Server (MSSQL)
Microsoft SQL Server широко використовується у корпоративному середовищі (особливо там, де вже є інфраструктура Microsoft). SQLAlchemy підтримує його через ODBC-підключення.
# pip install pyodbc
# Вимагає встановленого ODBC Driver for SQL Server на ОС
engine = create_engine(
"mssql+pyodbc://user:password@mssql-server:1433/mydb"
"?driver=ODBC+Driver+18+for+SQL+Server"
"&TrustServerCertificate=yes"
)
# pip install aioodbc
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
"mssql+aioodbc://user:password@mssql-server:1433/mydb"
"?driver=ODBC+Driver+18+for+SQL+Server"
"&TrustServerCertificate=yes"
)
IDENTITY замість SERIAL, NVARCHAR замість VARCHAR для Unicode, та TOP N замість LIMIT N. SQLAlchemy повністю приховує ці відмінності — ваш Python-код з select(User).limit(10) однаково коректно виконається і на PostgreSQL, і на SQL Server.Зведена таблиця вибору БД та драйвера для нових проєктів
| Сценарій | СУБД | Драйвер | DSN |
|---|---|---|---|
| FastAPI production (async) | PostgreSQL | asyncpg | postgresql+asyncpg:// |
| FastAPI production (sync) | PostgreSQL | psycopg (v3) | postgresql+psycopg:// |
| Локальна розробка / тести | SQLite | aiosqlite | sqlite+aiosqlite:///./dev.db |
| Unit-тести (in-memory) | SQLite | aiosqlite | sqlite+aiosqlite:///:memory: |
| Корпоративний стек Microsoft | MSSQL | pyodbc / aioodbc | mssql+pyodbc:// |
| Спадковий стек / хостинги | MySQL/MariaDB | asyncmy | mysql+asyncmy:// |
Синхронний Engine: create_engine()
Функція create_engine() приймає рядок підключення (DSN — Data Source Name) та низку опцій, що конфігурують поведінку пулу.
from sqlalchemy import create_engine
# Рядок підключення (DSN) для PostgreSQL з psycopg2
DATABASE_URL = "postgresql+psycopg2://user:password@localhost:5432/taskforge_db"
engine = create_engine(
DATABASE_URL,
# --- Параметри Connection Pool ---
pool_size=10, # Кількість «постійних» підключень у пулі
max_overflow=20, # Максимум тимчасових підключень понад pool_size
pool_timeout=30, # Секунд очікування вільного підключення з пулу
pool_recycle=1800, # Перестворювати підключення кожні 30 хвилин
# (щоб уникнути "stale connections" від PostgreSQL)
# --- Налагодження ---
echo=True, # Виводити всі SQL-запити у консоль (лише для розробки!)
)
echo=True є надзвичайно корисним інструментом для налагодження — він дозволяє бачити кожен SQL-запит, що генерує SQLAlchemy. Проте у production-середовищі він категорично вимкнений, оскільки виводить чутливі дані та значно навантажує систему логування.Асинхронний Engine: create_async_engine()
Для роботи з FastAPI у повністю асинхронному режимі (що є рекомендованим підходом, як ми дізналися у статті 14) необхідно використовувати create_async_engine() разом із асинхронним драйвером asyncpg.
from sqlalchemy.ext.asyncio import create_async_engine
# Зверніть на префікс: postgresql+asyncpg (не psycopg2!)
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/taskforge_db"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_recycle=1800,
echo=False, # У production завжди False
)
Ключова різниця між create_engine() та create_async_engine() полягає у внутрішньому DBAPI-драйвері:
postgresql+psycopg2→ синхронний драйвер, блокує потік виконання.postgresql+asyncpg→ асинхронний драйвер, звільняє event loop Python під час очікування відповіді від БД.
postgresql+psycopg (без суфікса 2) — це сучасна версія драйвера, що підтримує як синхронний, так і асинхронний режими. Вона є пріоритетним вибором для нових проєктів.Session та sessionmaker: Одиниця роботи
Session — це центральний об'єкт для роботи з ORM. Він реалізує патерн Unit of Work (Одиниця роботи), що є фундаментальним архітектурним патерном для роботи з базами даних (ми детально розглянемо його у розділі «Під капотом»).
Коротко: Session — це «записник» або «тимчасова пам'ять», яка:
- Відстежує всі Python-об'єкти (ORM-моделі), завантажені з БД або додані до нього.
- Накопичує всі зміни (INSERT, UPDATE, DELETE), не надсилаючи їх одразу до БД.
- При виклику
session.flush()абоsession.commit()— генерує відповідні SQL-запити та виконує їх у межах однієї транзакції.
sessionmaker та async_sessionmaker
Оскільки Session має бути короткоживучим об'єктом (один запит = одна сесія), зручно мати фабрику, яка створює нові сесії з потрібними налаштуваннями. Для цього слугують sessionmaker та async_sessionmaker.
Базовий шаблон виглядає так:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine("postgresql+psycopg://user:pass@localhost/db")
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False,
)
# Один HTTP-запит = одна сесія:
with SessionLocal() as session:
user = session.get(User, 1)
user.is_active = False
session.commit()
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
async_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
expire_on_commit=False,
)
# Один HTTP-запит = одна async-сесія:
async with AsyncSessionLocal() as session:
user = await session.get(User, 1)
user.is_active = False
await session.commit()
Синхронний Engine: create_engine()
Функція create_engine() приймає рядок підключення (DSN — Data Source Name) та низку опцій, що конфігурують поведінку пулу.
from sqlalchemy import create_engine
# Рядок підключення (DSN) для PostgreSQL з psycopg2
DATABASE_URL = "postgresql+psycopg2://user:password@localhost:5432/taskforge_db"
engine = create_engine(
DATABASE_URL,
# --- Параметри Connection Pool ---
pool_size=10, # Кількість «постійних» підключень у пулі
max_overflow=20, # Максимум тимчасових підключень понад pool_size
pool_timeout=30, # Секунд очікування вільного підключення з пулу
pool_recycle=1800, # Перестворювати підключення кожні 30 хвилин
# (щоб уникнути "stale connections" від PostgreSQL)
# --- Налагодження ---
echo=True, # Виводити всі SQL-запити у консоль (лише для розробки!)
)
echo=True є надзвичайно корисним інструментом для налагодження — він дозволяє бачити кожен SQL-запит, що генерує SQLAlchemy. Проте у production-середовищі він категорично вимкнений, оскільки виводить чутливі дані та значно навантажує систему логування.Асинхронний Engine: create_async_engine()
Для роботи з FastAPI у повністю асинхронному режимі (що є рекомендованим підходом, як ми дізналися у статті 14) необхідно використовувати create_async_engine() разом із асинхронним драйвером asyncpg.
from sqlalchemy.ext.asyncio import create_async_engine
# Зверніть на префікс: postgresql+asyncpg (не psycopg2!)
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/taskforge_db"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_recycle=1800,
echo=False, # У production завжди False
)
Ключова різниця між create_engine() та create_async_engine() полягає у внутрішньому DBAPI-драйвері:
postgresql+psycopg2→ синхронний драйвер, блокує потік виконання.postgresql+asyncpg→ асинхронний драйвер, звільняє event loop Python під час очікування відповіді від БД.
postgresql+psycopg (без суфікса 2) — це сучасна версія драйвера, що підтримує як синхронний, так і асинхронний режими. Вона є пріоритетним вибором для нових проєктів.Параметри конфігурації Connection Pool у Engine
При створенні Engine або AsyncEngine за допомогою параметрів можна тонко налаштувати поведінку вбудованого пулу з'єднань (QueuePool). Розглянемо ключові параметри, їхні значення за замовчуванням та вплив на систему:
pool_size(типint, за замовчуванням5) Визначає кількість «постійно відкритих» (keep-alive) підключень, які пул зберігає в пам'яті. Ці з'єднання ніколи не закриваються при звільненні.- На що впливає: Лімітує базову пропускну здатність. Якщо ваші FastAPI-воркери роблять мало паралельних запитів, 5–10 підключень цілком достатньо. Для навантажених додатків це значення збільшують до 20–50.
- Застереження: Кожне з'єднання споживає RAM на сервері PostgreSQL (близько 10 МБ на з'єднання). Збільшення
pool_sizeпонад лімітmax_connectionsу конфігурації PostgreSQL призведе до того, що база даних відхилятиме нові підключення.
max_overflow(типint, за замовчуванням10) Визначає максимальну кількість тимчасових додаткових підключень, які пул може створити, якщо всі з'єднання зpool_sizeзараз зайняті.- На що впливає: Дозволяє додатку безболісно переживати раптові піки навантаження. Коли пік минає, ці «overflow» підключення автоматично закриваються.
- Розрахунок: Загальна максимальна кількість одночасно відкритих підключень з одного екземпляра додатка дорівнює
pool_size + max_overflow. Наприклад,pool_size=10, max_overflow=20дає ліміт у 30 з'єднань.
pool_recycle(типint, за замовчуванням-1, тобто вимкнено) Визначає максимальний вік підключення у секундах. Якщо підключення старше за це значення, при поверненні в пул воно буде закрите, а замість нього буде створено нове.- На що впливає: Рятує від проблеми stale connections (застарілих підключень). Деякі мережеві екрани (firewalls), хмарні бази даних (наприклад, AWS RDS, Heroku Postgres) або сам PostgreSQL можуть примусово розривати неактивні TCP-сесії після певного періоду простою (наприклад, 10 або 30 хвилин). Якщо пул спробує використати таке з'єднання, виникне помилка на кшталт
OperationalError: Connection handshake failed. Встановленняpool_recycle=1800(30 хвилин) змушує оновлювати підключення превентивно.
- На що впливає: Рятує від проблеми stale connections (застарілих підключень). Деякі мережеві екрани (firewalls), хмарні бази даних (наприклад, AWS RDS, Heroku Postgres) або сам PostgreSQL можуть примусово розривати неактивні TCP-сесії після певного періоду простою (наприклад, 10 або 30 хвилин). Якщо пул спробує використати таке з'єднання, виникне помилка на кшталт
pool_pre_ping(типbool, за замовчуваннямFalse) Якщо встановлено вTrue, при кожному отриманні з'єднання з пулу SQLAlchemy виконує швидкий тестовий запит (зазвичайSELECT 1) перед тим, як віддати з'єднання вашому коду.- На що впливає: Забезпечує високу стійкість до збоїв. Якщо з'єднання виявиться «мертвим» (через перезапуск бази даних або мережевий збій), пул тихо закриє його, відкриє нове працездатне підключення та прозоро віддасть вашому коду. Розробник навіть не помітить, що з'єднання розривалося.
- Рекомендація: Завжди ставте
pool_pre_ping=Trueу production. Хоча це додає крихітний оверхед на один SQL-запит-тест, стійкість додатку зростає в рази.
Приклад повної production конфігурації Engine:
async_engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Збільшено для навантаженого API
max_overflow=10, # Додаткові 10 з'єднань під час піків
pool_recycle=1800, # Оновлювати кожні 30 хвилин (захист від AWS RDS таймаутів)
pool_pre_ping=True, # ✅ Перевіряти працездатність з'єднання перед відправкою запиту
)
Session та sessionmaker: Одиниця роботи
Session — це центральний об'єкт для роботи з ORM. Він реалізує патерн Unit of Work (Одиниця роботи), що є фундаментальним архітектурним патерном для роботи з базами даних (ми детально розглянемо його у розділі «Під капотом»).
Коротко: Session — це «записник» або «тимчасова пам'ять», яка:
- Відстежує всі Python-об'єкти (ORM-моделі), завантажені з БД або додані до нього.
- Накопичує всі зміни (INSERT, UPDATE, DELETE), не надсилаючи їх одразу до БД.
- При виклику
session.flush()абоsession.commit()— генерує відповідні SQL-запити та виконує їх у межах однієї транзакції.
sessionmaker та async_sessionmaker
Оскільки Session має бути короткоживучим об'єктом (один запит = одна сесія), зручно мати фабрику, яка створює нові сесії з потрібними налаштуваннями. Для цього слугують sessionmaker та async_sessionmaker.
Базовий шаблон виглядає так:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine("postgresql+psycopg://user:pass@localhost/db")
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False,
)
# Один HTTP-запит = одна сесія:
with SessionLocal() as session:
user = session.get(User, 1)
user.is_active = False
session.commit()
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
async_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
expire_on_commit=False,
)
# Один HTTP-запит = одна async-сесія:
async with AsyncSessionLocal() as session:
user = await session.get(User, 1)
user.is_active = False
await session.commit()
Параметри sessionmaker та async_sessionmaker: повний розбір
Обидві фабрики приймають однакові параметри. Нижче — вичерпна документація кожного з них із прикладами впливу на поведінку.
bind / autobegin
Прив'язує фабрику до конкретного Engine або AsyncEngine. Усі сесії, створені цією фабрикою, будуть автоматично використовувати саме це підключення до БД. Якщо не вказати тут — доведеться передавати bind при кожному виклику Session().
Якщо True, SQLAlchemy автоматично починає нову транзакцію (BEGIN) при першій операції з БД. Якщо False — транзакцію потрібно починати вручну через session.begin(). Значення True (за замовчуванням) є зручним та безпечним для більшості випадків.
# Стандартне використання — bind + autobegin=True (за замовчуванням)
SessionLocal = sessionmaker(bind=engine, autobegin=True)
with SessionLocal() as session:
# При першому execute() SQLAlchemy автоматично відкриває транзакцію:
# BEGIN (implicit)
user = session.get(User, 1)
session.commit()
# COMMIT
# autobegin=False — повний ручний контроль (рідко потрібен):
ManualSession = sessionmaker(bind=engine, autobegin=False)
with ManualSession() as session:
with session.begin(): # Явний BEGIN
user = session.get(User, 1)
# COMMIT автоматично при виході з session.begin()
autocommit
Чому False? Це один з найважливіших параметрів, що забезпечує атомарність операцій.
Якщо autocommit=True — кожен SQL-запит виконується у своїй власній транзакції і негайно комітиться. Це означає, що неможливо атомарно виконати кілька операцій: якщо після першого INSERT програма впаде — другий INSERT вже буде збережений у БД, а перший — ні. Дані опиняться у неузгодженому стані.
Якщо autocommit=False — всі операції в межах сесії виконуються в одній транзакції, яку ви контролюєте через commit() / rollback(). Це гарантує ACID-властивості.
# ❌ autocommit=True — небезпечно, не використовуйте у production
AutoCommitSession = sessionmaker(bind=engine, autocommit=True)
with AutoCommitSession() as session:
# Кожен виклик одразу комітиться окремо!
session.add(User(username="alice")) # SQL: INSERT + COMMIT
# Якщо тут виникне помилка:
raise RuntimeError("Щось пішло не так!")
session.add(User(username="bob")) # Цей INSERT ніколи не виконається
# alice вже збережена у БД — неузгодженість!
# ✅ autocommit=False — безпечно (за замовчуванням)
SafeSession = sessionmaker(bind=engine, autocommit=False)
with SafeSession() as session:
try:
session.add(User(username="alice"))
session.add(User(username="bob"))
session.commit() # Обидва INSERT у ONE транзакції
except Exception:
session.rollback() # Обидва відкочуються — БД у чистому стані
autoflush
Чому у FastAPI зазвичай ставлять False?
За замовчуванням autoflush=True: SQLAlchemy автоматично виконує flush() (відправляє накопичені зміни у БД у рамках поточної транзакції) перед кожним SELECT-запитом у межах поточної сесії.
На перший погляд це зручно: ви завжди бачите актуальні дані навіть до commit(). Але у FastAPI-сервісах, де сесія часто передається через DI між кількома шарами (router → service → repository), autoflush=True може призводити до неочікуваних SQL-запитів і ускладнює налагодження.
Рекомендація для FastAPI: autoflush=False. Ви самостійно викликаєте session.flush() тільки коли це потрібно (наприклад, щоб отримати згенерований id). Це дає повний контроль і передбачувану поведінку.
# ✅ autoflush=True (за замовчуванням у SQLAlchemy)
AutoFlushSession = sessionmaker(bind=engine, autoflush=True)
with AutoFlushSession() as session:
new_user = User(username="charlie")
session.add(new_user)
# ↓ Тут ще немає flush()
# Але при будь-якому SELECT — автоматичний flush перед ним:
existing = session.execute(select(User).where(User.username == "alice")).scalar()
# SQLAlchemy робить:
# 1. INSERT INTO users (username) VALUES ('charlie') ← autoflush!
# 2. SELECT * FROM users WHERE username = 'alice'
# Це зручно, але може бути несподіваним, якщо charlie ще «не готовий»
# (наприклад, не встановлено обов'язкове поле email)
# ✅ autoflush=False (рекомендовано для FastAPI)
ManualFlushSession = sessionmaker(bind=engine, autoflush=False)
with ManualFlushSession() as session:
new_user = User(username="charlie")
session.add(new_user)
# SELECT виконується одразу, без autoflush
existing = session.execute(select(User).where(User.username == "alice")).scalar()
# SQL: SELECT * FROM users WHERE username = 'alice'
# charlie ще не INSERT-нутий — SELECT не «бачить» його
# Явний flush лише тоді, коли потрібен id:
session.flush() # INSERT INTO users ...
print(new_user.id) # Тепер id доступний
session.commit()
expire_on_commit
Найважливіший параметр для async FastAPI — завжди ставте False!
Після виклику commit() SQLAlchemy за замовчуванням (True) «відмічає» всі об'єкти у сесії як expired (застарілі). Це означає: при наступному зверненні до будь-якого атрибуту об'єкта SQLAlchemy виконає новий SELECT-запит, щоб «освіжити» дані з БД.
У синхронному режимі це відбувається непомітно — lazy refresh є автоматичним.
В асинхронному режимі це катастрофа: після await session.commit() сесія більше не активна в async-контексті. Спроба отримати атрибут expired-об'єкта викличе MissingGreenlet: greenlet_spawn has not been called — одна з найпоширеніших помилок початківців у async SQLAlchemy.
# --- SYNC: expire_on_commit=True (за замовчуванням) ---
SyncSession = sessionmaker(bind=engine, expire_on_commit=True)
with SyncSession() as session:
user = session.get(User, 1)
user.username = "updated"
session.commit()
# user тепер expired!
# Але sync-режим «рятує» нас автоматично:
print(user.username)
# SELECT * FROM users WHERE id = 1 ← автоматичний lazy SELECT!
# Виводить: "updated" ✅ Працює, але виконує зайвий запит
# --- ASYNC: expire_on_commit=True — ПОМИЛКА! ---
BadAsyncSession = async_sessionmaker(bind=async_engine, expire_on_commit=True)
async with BadAsyncSession() as session:
user = await session.get(User, 1)
user.username = "updated"
await session.commit()
# user тепер expired!
print(user.username)
# ❌ MissingGreenlet: greenlet_spawn has not been called
# Немає активної async-транзакції для lazy SELECT!
# --- ASYNC: expire_on_commit=False — правильно! ---
GoodAsyncSession = async_sessionmaker(bind=async_engine, expire_on_commit=False)
async with GoodAsyncSession() as session:
user = await session.get(User, 1)
user.username = "updated"
await session.commit()
# user НЕ expired — значення в пам'яті збережено
print(user.username) # ✅ "updated" — з пам'яті Python, без SQL
expire_on_commit=False означає, що після commit() атрибути об'єкта можуть не відображати реальний стан БД (якщо хтось зовні змінив той самий запис). У FastAPI-додатках це не є проблемою — кожен HTTP-запит отримує свіжу сесію та свіжий SELECT. Але якщо ваша логіка потребує «свіжих» даних після commit — виконайте await session.refresh(obj) вручну.class_ (лише для async_sessionmaker)
Визначає конкретний клас, який буде створювати фабрика. Для async_sessionmaker потрібно явно передати class*=AsyncSession. Це дозволяє підставляти власні підкласи AsyncSession з кастомною логікою (наприклад, автоматичним логуванням або метриками).
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class LoggedSession(AsyncSession):
"""AsyncSession з логуванням кожного коміту."""
async def commit(self):
print(f"[DB] Committing transaction in session {id(self)}")
await super().commit()
LoggedSessionLocal = async_sessionmaker(
bind=async_engine,
class_=LoggedSession, # Використовуємо наш кастомний клас
expire_on_commit=False,
)
async with LoggedSessionLocal() as session:
# ... операції ...
await session.commit()
# Виведе: [DB] Committing transaction in session 140234567890
Підсумковий конфіг для production FastAPI
Ось канонічна конфігурація async_sessionmaker для production FastAPI-додатку на основі всього вищесказаного:
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
async_engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost:5432/taskforge_db",
pool_size=10,
max_overflow=20,
pool_recycle=1800,
echo=False,
)
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
autocommit=False, # ✅ Явний контроль транзакцій (ACID-безпека)
autoflush=False, # ✅ Явний flush() — передбачувана поведінка
expire_on_commit=False, # ✅ Обов'язково для async FastAPI
)
ORM-Моделі у SQLAlchemy 2.0: Новий «Python-first» синтаксис
Якщо у вас є досвід роботи зі SQLAlchemy 1.x, перший погляд на код SQLAlchemy 2.0 може викликати легкий шок — він виглядає зовсім інакше. Це навмисна зміна: команда SQLAlchemy чітко взяла курс на максимальну інтеграцію з сучасним Python-синтаксисом типізації, наслідуючи ідеї Pydantic та dataclasses.
Еволюція синтаксису: від 1.x до 2.0
Щоб оцінити масштаб змін, подивимося на одну і ту саму модель у різних синтаксисах:
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base() # Стара функція
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
is_active = Column(Boolean, default=True)
# ❌ Немає type hints — IDE не знає тип атрибутів
# ❌ user.id може бути int або None — незрозуміло без читання Column()
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): # Новий стиль — клас, а не виклик функції
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(100), unique=True)
is_active: Mapped[bool] = mapped_column(default=True)
# ✅ Повні type hints — IDE точно знає типи
# ✅ Mapped[int] означає NOT NULL, Mapped[int | None] — NULLABLE
Різниця кардинальна. У новому синтаксисі тип стовпця (nullable/not-nullable) тепер виводиться автоматично з типової анотації:
Mapped[int]→INTEGER NOT NULLMapped[int | None]абоMapped[Optional[int]]→INTEGER NULLMapped[str]→VARCHAR NOT NULLMapped[bool]→BOOLEAN NOT NULL
DeclarativeBase: Базовий клас нового покоління
У SQLAlchemy 2.0 рекомендованим способом створення базового класу для всіх моделей є успадкування від DeclarativeBase замість виклику застарілої функції declarative_base().
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
"""
Базовий клас для всіх ORM-моделей проєкту.
Всі моделі мають успадковуватися від цього класу.
"""
pass
Ця проста конструкція робить кілька речей одночасно:
- Реєструє метадані — кожен підклас
Baseавтоматично реєструє своє визначення таблиці уBase.metadata, що використовується при міграціях Alembic. - Забезпечує підтримку type checking — сучасні IDE (PyCharm, VS Code з Pylance) повністю розуміють
Mapped[T]і надають автодоповнення та перевірку типів. - Дозволяє типізовану конфігурацію через
__class_getitem__механізм — можна визначатиtype_annotation_mapдля кастомних типів.
mapped_column(): Детальна конфігурація стовпців
Функція mapped_column() є аналогом старого Column(), але спроєктована для роботи разом із анотаціями Mapped[T]. Вона приймає всі ті самі аргументи, що і Column(), але вже не потребує явного вказання типу (він виводиться з Mapped[T]).
Розглянемо повний приклад моделі User з усіма типовими сценаріями:
from datetime import datetime
from sqlalchemy import String, Text, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
# --- Первинний ключ ---
id: Mapped[int] = mapped_column(primary_key=True)
# --- Рядкові поля з обмеженнями ---
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
# --- Nullable поле (опціональне) ---
# Mapped[str | None] → VARCHAR NULL (поле може бути пустим)
bio: Mapped[str | None] = mapped_column(Text, default=None)
# --- Поле з серверним значенням за замовчуванням ---
# server_default=func.now() → DEFAULT NOW() виконується на стороні PostgreSQL
created_at: Mapped[datetime] = mapped_column(
server_default=func.now()
)
# --- Поле з клієнтським значенням за замовчуванням ---
# default=True → значення встановлюється Python-кодом, не SQL
is_active: Mapped[bool] = mapped_column(default=True)
def __repr__(self) -> str:
return f"<User id={self.id} username={self.username!r}>"
Типи стовпців SQLAlchemy
SQLAlchemy надає багатий набір типів стовпців, що відображаються на відповідні SQL-типи конкретної бази даних:
Python тип в Mapped[T] | SQLAlchemy тип | PostgreSQL тип |
|---|---|---|
int | Integer | INTEGER |
str | String(n) / Text | VARCHAR(n) / TEXT |
bool | Boolean | BOOLEAN |
float | Float | FLOAT |
datetime | DateTime | TIMESTAMP |
date | Date | DATE |
Decimal | Numeric(p, s) | NUMERIC(p, s) |
bytes | LargeBinary | BYTEA |
dict | JSON | JSONB |
UUID | Uuid (2.0+) | UUID |
Детальний опис найбільш популярних типів стовпців SQLAlchemy, їхнього призначення, особливостей використання та поширених помилок:
- Коли використовувати:
Integerпідходить для стандартних лічильників, невеликих ID та статусів.BigInteger(відображається вBIGINT) є обов'язковим для первинних ключів (ID) у великих або швидкозростаючих таблицях (де кількість записів може перевищити 2 мільярди), грошових сум у копійках, Telegram ID чи інших великих зовнішніх ідентифікаторів. - Коли НЕ використовувати: Уникайте
Integerдля первинних ключів у великих таблицях — переповненняINTє класичною причиною збоїв великих систем. Також не використовуйте цілі числа для фінансових розрахунків, якщо тільки ви не зберігаєте суми строго у мінімальних одиницях валюти (наприклад, центах).
# Приклад використання
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
views_count: Mapped[int] = mapped_column(Integer, default=0)
- Коли використовувати:
String(n)підходить для коротких обмежених рядків (імена, email, slug, паролі). Завжди вказуйте довжину для оптимізації та валідації на рівні СУБД.Textвикористовується для довгих текстів без чіткого ліміту довжини (коментарі, статті, описи). - Коли НЕ використовувати: Не використовуйте
Stringбез ліміту (простоString()) у базах даних, які не підтримують необмеженийVARCHARбез явного зазначення довжини (наприклад, MySQL вимагає довжину, тоді як PostgreSQL дозволяєVARCHARбез довжини, що еквівалентноTEXT). Не зберігайте складну структуру (JSON, списки) у звичайних текстових стовпцях.
# Приклад використання
email: Mapped[str] = mapped_column(String(255), unique=True)
content: Mapped[str] = mapped_column(Text)
True або False.- Коли використовувати: Для бінарних прапорців стану, таких як
is_active,has_permission,email_verified. - Коли НЕ використовувати: Якщо стан сутності має складніший життєвий цикл (наприклад, статус замовлення:
draft,processing,shipped,delivered), краще використовуватиEnumабо зв'язану таблицю статусів замість створення кількох булевих полів на кшталтis_draft,is_processed.
# Приклад використання
is_verified: Mapped[bool] = mapped_column(Boolean, default=False)
- Коли використовувати:
Numeric(p, s)(абоDECIMAL) є єдиним правильним вибором для збереження грошових сум, фінансових розрахунків та точних відсотків, де недопустимі помилки округлення.Floatпідходить для наукових обчислень, гео-координат (широта/довгота), метрик, де крихітні похибки через особливості репрезентації float-чисел у пам'яті комп'ютера не мають критичного значення. - Коли НЕ використовувати: Ніколи не використовуйте
Floatдля збереження грошей. Помилки округлення на кшталт0.1 + 0.2 = 0.30000000000000004призведуть до фінансових розбіжностей.
# Приклад використання
from decimal import Decimal
price: Mapped[Decimal] = mapped_column(Numeric(10, 2)) # Максимум 99999999.99
latitude: Mapped[float] = mapped_column(Float)
- Коли використовувати: Для часових позначок створення/оновлення, логів та подій завжди використовуйте
DateTime(timezone=True)(збереження UTC часової зони).Dateпідходить для днів народження чи календарних дат (без прив'язки до годин).Time— для розкладів (наприклад, "о 18:00 кожні вихідні"). - Коли НЕ використовувати: Уникайте збереження часових міток як цілих чисел (
timestampв Unix Epochint), оскільки це ускладнює роботу зі SQL-запитами безпосередньо у базі даних (ускладнює агрегації по датах, групування тощо). УникайтеDateTimeбез часової зони (timezone=False), якщо додатком користуються користувачі в різних часових поясах.
# Приклад використання
from datetime import datetime, date
registered_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
birth_date: Mapped[date] = mapped_column(Date)
- Коли використовувати: Для конфігурацій, гнучких метаданих додаткових полів користувача, кешування або історії змін. У PostgreSQL завжди надавайте перевагу
sqlalchemy.dialects.postgresql.JSONB, оскільки він зберігає дані у декомпонованому бінарному форматі, що дозволяє індексувати JSON-ключі (через GIN-індекси) та виконувати швидкий пошук по них. - Коли НЕ використовувати: Не використовуйте JSON для полів, які беруть участь у реляційних зв'язках (
JOIN), або полів, які часто оновлюються окремо. Якщо вам постійно доводиться робити запити всередину JSON-документа для фільтрації, краще винести ці ключі в окремі класичні SQL-стовпці.
# Приклад використання
from sqlalchemy.dialects.postgresql import JSONB
metadata_info: Mapped[dict] = mapped_column(JSONB, default=dict)
- Коли використовувати: Ідеачно для первинних ключів у розподілених системах, де клієнт або мікросервіс може самостійно згенерувати ID до запису в БД, або для публічних ID сутностей, щоб приховати внутрішню послідовність (sequential IDs) від перебору (enumeration attack). У SQLAlchemy 2.0 є нативний тип
Uuid. - Коли НЕ використовувати: У дуже великих таблицях, які потребують високої швидкості вставки (
INSERT). UUIDv4 генерується випадковим чином, через що індекси B-Tree сильно фрагментуються, сповільнюючи операції запису. Для таких випадків краще використовувати UUIDv7 (сортовані за часом) або залишати автоінкрементнийBigIntegerдля внутрішніх зв'язків, а UUID використовувати лише як зовнішній ідентифікатор.
# Приклад використання
import uuid
from sqlalchemy import Uuid
uuid_id: Mapped[uuid.UUID] = mapped_column(Uuid, default=uuid.uuid4)
enum.Enum.- Коли використовувати: Для статусів, ролей користувачів, категорій, де набір значень є чітким та заздалегідь визначеним на рівні коду.
- Коли НЕ використовувати: Не створюйте нативний ENUM бази даних (параметр
native_enum=True), якщо список статусів буде часто змінюватися, оскільки операція оновлення нативного ENUM у схемі БД (наприклад,ALTER TYPE ... ADD VALUE) в деяких СУБД є блокуючою або складною для міграцій. Встановлюйтеnative_enum=False(за замовчуванням у SQLAlchemy для багатьох БД) — тоді значення зберігатимуться якVARCHARз обмеженнямCHECK constraint.
# Приклад використання
import enum
from sqlalchemy import Enum
class UserRole(enum.Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
role: Mapped[UserRole] = mapped_column(Enum(UserRole), default=UserRole.USER)
UUID у SQLAlchemy 2.0 з'явився нативний тип Uuid, що автоматично конвертує uuid.UUID з Python у відповідний тип бази даних:import uuid
from sqlalchemy import Uuid
class Project(Base):
__tablename__ = "projects"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
Стандартний абстрактний базовий клас з аудитними полями
У реальних проєктах зручно винести загальні поля (наприклад, id, created_at, updated_at) до абстрактного класу, від якого успадковуватимуться всі моделі — так само, як це роблять у EF Core через клас BaseEntity:
public abstract class BaseEntity
{
public int Id { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? UpdatedAt { get; set; }
}
public class User : BaseEntity
{
public string Username { get; set; } = default!;
}
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from datetime import datetime
class Base(DeclarativeBase):
pass
class TimestampMixin:
"""Mixin з аудитними полями для всіх моделей."""
created_at: Mapped[datetime] = mapped_column(
server_default=func.now()
)
updated_at: Mapped[datetime | None] = mapped_column(
onupdate=func.now(),
default=None
)
class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
Параметр onupdate=func.now() у mapped_column() — це «клієнтський» тригер: SQLAlchemy автоматично підставляє поточний час у UPDATE-запит, якщо об'єкт змінився. Відповідником у PostgreSQL є тригер BEFORE UPDATE, але цей підхід зручніший — логіка знаходиться у Python-коді, а не у SQL.
Relationships: Зв'язки між таблицями
Реляційні бази даних отримали свою назву саме через зв'язки між таблицями. Головна сила ORM полягає в тому, що ці зв'язки можна виразити декларативно у Python-коді, а потім працювати з ними як зі звичайними атрибутами об'єктів.
У SQLAlchemy для визначення зв'язків між моделями використовується функція relationship() у поєднанні з ForeignKey у визначенні стовпця.
"Post"). Рядкові імена запобігають помилкам циклічного імпорту в Python.save-update(за замовчуванням): додавання батьківського об'єкта до сесії (session.add()) автоматично додає всі пов'язані дочірні об'єкти.merge(за замовчуванням): злиття стану батьківського об'єкта (session.merge()) також зливає стан пов'язаних дочірніх об'єктів.delete: при видаленні батьківського об'єкта (session.delete(parent)) автоматично видаляються всі пов'язані дочірні записи з БД.delete-orphan: якщо дочірній об'єкт від'єднати від колекції батьківського (наприклад, видалити зі списку), він вважається «сиротою» та автоматично видаляється з БД, замість того, щоб отриматиNULLу зовнішньому ключі.refresh-expire: операціїsession.expire()абоsession.refresh()на батьківському об'єкті поширюються також і на дочірні.expunge: при виведенні батьківського об'єкта із сесії (session.expunge()) дочірні об'єкти також виводяться з неї.all: швидкий синонім для"save-update, merge, refresh-expire, expunge, delete". Найбільш популярне поєднання для One-to-Many з жорстким володінням:"all, delete-orphan".
True, SQLAlchemy не завантажуватиме дочірні записи з БД при видаленні батьківського об'єкта, а делегуватиме каскадне видалення базі даних (вимагає ON DELETE CASCADE у зовнішньому ключі)."select" (lazy), "joined" (eager via JOIN), "selectin" (eager via IN), або "raise" (заборона lazy load).Зовнішні ключі: ForeignKey
У той час як relationship() створює зв'язок на рівні Python-об'єктів, клас ForeignKey створює справжнє обмеження цілісності зовнішнього ключа (FOREIGN KEY constraint) безпосередньо у схемі бази даних.
ForeignKey передається як аргумент у mapped_column() для стовпців, що мають посилатися на інші таблиці.
"ім'я_таблиці.ім'я_стовпця" (наприклад, "users.id").
Зверніть увагу: вказується саме фізичне ім'я таблиці з бази даних (__tablename__), а не назва класу ORM-моделі."CASCADE": автоматично видалити дочірній рядок. Рекомендовано для сильних зв'язків."SET NULL": встановити значення цього поля вNULL(стовпець має бути nullable)."RESTRICT": заборонити видалення батьківського рядка, якщо на нього посилаються дочірні."NO ACTION": стандартна поведінка (перевірка обмеження в кінці транзакції).
"CASCADE", щоб оновлені ID автоматично прописувалися у дочірніх таблицях.Приклад використання:
# Посилання на таблицю "users" стовпець "id" з каскадним видаленням на рівні СУБД
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE")
)
One-to-Many (Один до Багатьох)
Найпоширеніший тип зв'язку. Один запис у таблиці A може мати декілька пов'язаних записів у таблиці B. Наприклад: один User може мати багато Post-ів.
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import String, Text, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
# "Один до Багатьох": один User має багато Post-ів
# back_populates="author" зв'язує цю сторону зі стороною Post.author
posts: Mapped[list["Post"]] = relationship(
"Post",
back_populates="author",
cascade="all, delete-orphan", # При видаленні User — видати всі його Post-и
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str | None] = mapped_column(Text)
# Зовнішній ключ — посилання на users.id
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# "Зворотний" зв'язок: Post посилається на свого User (автора)
author: Mapped["User"] = relationship("User", back_populates="posts")
Зверніть на кілька важливих деталей:
from __future__ import annotations— це директива, що дозволяє використовувати рядкові посилання на класи ("Post","User") без циклічного імпорту. Вона змушує Python відкладати обчислення анотацій і є стандартною практикою у файлах із взаємними посиланнями.back_populates— параметр, що встановлює двонаправлений зв'язок: коли ви змінюєтеuser.posts, SQLAlchemy автоматично оновлюєpost.author, і навпаки. Це синхронізація в оперативній пам'яті, а не в базі даних.cascade="all, delete-orphan"— правило каскадної поведінки: якщо об'єктUserвидаляється, всі його пов'язаніPost-и також будуть видалені (аналогON DELETE CASCADEу SQL, але реалізований на рівні ORM).
Порівняння з EF Core Navigation Properties
У EF Core аналогічна структура виглядала б так:
public class User
{
public int Id { get; set; }
public string Username { get; set; } = default!;
// Navigation Property: колекція пов'язаних Post-ів
public ICollection<Post> Posts { get; set; } = new List<Post>();
}
public class Post
{
public int Id { get; set; }
public string Title { get; set; } = default!;
// Foreign Key + Navigation Property
public int AuthorId { get; set; }
public User Author { get; set; } = default!;
}
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50))
# Аналог Navigation Property з колекцією
posts: Mapped[list["Post"]] = relationship(back_populates="author")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
# Аналог Foreign Key + Navigation Property
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
Many-to-Many (Багато до Багатьох)
Зв'язок «багато до багатьох» реалізується через асоціативну (проміжну) таблицю. Наприклад: один Post може мати багато Tag-ів, і один Tag може належати до багатьох Post-ів.
У SQLAlchemy 2.0 є два підходи для реалізації зв'язку «багато до багатьох».
Конструювання таблиць на рівні Core: Table та Column
Для створення найпростішої проміжної таблиці (без додаткових полів) у SQLAlchemy використовується низькорівневий конструктор Table та клас Column з пакету sqlalchemy (Core-рівень). Оскільки ця таблиця потрібна лише для зв'язку між об'єктами і ми не будемо створювати для неї окремий Python-клас моделі, вона оголошується як об'єкт.
"post_tags").MetaData, у якому реєструється таблиця (зазвичай передається Base.metadata). Це необхідно, щоб SQLAlchemy та інструмент міграцій Alembic знали про існування цієї таблиці та могли згенерувати її в БД.*args) — це стовпці таблиці (об'єкти Column) або обмеження цілісності (наприклад, UniqueConstraint).Також у цих таблицях використовуються класи Column (старий Core-еквівалент сучасного ORM-івського mapped_column()):
Table(), це ім'я є обов'язковим першим аргументом.type_ (наприклад, Integer, String(50)).*args / **kwargs): ForeignKey, primary_key=True, nullable=False, index=True тощо.Підхід 1: Чиста асоціативна таблиця (якщо таблиця не має власних атрибутів):
from sqlalchemy import Table, Column, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
# Проміжна таблиця (без власного класу)
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True),
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
# Many-to-Many через secondary (проміжну таблицю)
tags: Mapped[list["Tag"]] = relationship(
"Tag",
secondary=post_tags,
back_populates="posts"
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list["Post"]] = relationship(
"Post",
secondary=post_tags,
back_populates="tags"
)
Підхід 2: Асоціативний клас (якщо проміжна таблиця має власні атрибути, наприклад, role у ProjectMember):
from datetime import datetime
from sqlalchemy import String, ForeignKey, Enum, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
import enum
class Base(DeclarativeBase):
pass
class MemberRole(enum.Enum):
OWNER = "owner"
EDITOR = "editor"
VIEWER = "viewer"
class ProjectMember(Base):
"""Асоціативна таблиця з власними атрибутами."""
__tablename__ = "project_members"
project_id: Mapped[int] = mapped_column(
ForeignKey("projects.id"), primary_key=True
)
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id"), primary_key=True
)
role: Mapped[MemberRole] = mapped_column(
Enum(MemberRole), default=MemberRole.VIEWER
)
joined_at: Mapped[datetime] = mapped_column(server_default=func.now())
# Navigation properties
project: Mapped["Project"] = relationship(back_populates="memberships")
user: Mapped["User"] = relationship(back_populates="memberships")
class Project(Base):
__tablename__ = "projects"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
memberships: Mapped[list["ProjectMember"]] = relationship(
back_populates="project"
)
Параметри конфігурації relationship()
Функція relationship() керує об'єднанням моделей на рівні Python. Вона приймає низку критично важливих параметрів, які визначають поведінку зв'язків:
cascade(типstr, за замовчуванням"save-update, merge") Керує тим, як операції над батьківським об'єктом (наприклад, додавання, оновлення, видалення) поширюються на пов'язані дочірні об'єкти.save-update: якщо додати батьківський об'єкт до сесії (session.add(parent)), усі його дочірні об'єкти автоматично додадуться до сесії.delete: якщо видалити батьківський об'єкт (session.delete(parent)), автоматично видаляться всі пов'язані дочірні об'єкти (аналогON DELETE CASCADEу БД, але виконується силами Python).delete-orphan(критично важливий): якщо дочірній об'єкт від'єднати від батьківського (наприклад, видалити його зі спискуparent.children.remove(child)), цей дочірній об'єкт автоматично буде видалено з БД (інакше він перетвориться на «сироту» — запис у БД зparent_id = NULL).- Рекомендований набір: для One-to-Many зв'язків із жорстким володінням (наприклад,
User->Posts) зазвичай ставлятьcascade="all, delete-orphan"(allє синонімом дляsave-update, merge, refresh-expire, expunge, delete).
passive_deletes(типboolабо"all", за замовчуваннямFalse) Впливає на поведінку каскадного видалення.- Якщо
False, то при видаленні батьківського об'єкта SQLAlchemy виконає окремийSELECT-запит, щоб завантажити всі дочірні об'єкти в пам'ять, а потім згенерує окреміDELETE-запити для кожного з них. Це створює величезний оверхед. - Якщо
True, SQLAlchemy НЕ завантажуватиме дочірні об'єкти, а просто видалить батьківський об'єкт. При цьому вона покладатиметься на те, що на рівні самої бази даних у зовнішньому ключі прописаноON DELETE CASCADE(наприклад,ForeignKey("parent.id", ondelete="CASCADE")). Це в рази швидше й ефективніше.
- Якщо
lazy(типstr, за замовчуванням"select") Визначає стратегію завантаження пов'язаних об'єктів за замовчуванням, якщо її не перевизначено в запиті:"select"(абоTrue): ліниве завантаження (lazy loading)."joined"(абоFalse): жадібне завантаження черезLEFT OUTER JOIN(eager loading)."selectin": жадібне завантаження окремимSELECTзIN(eager loading)."raise": кидає помилку при спробі lazy loading. Дуже корисний режим для асинхронного коду, щоб гарантувати, що розробник не забув зробити eager loading і не отримавMissingGreenletу невідповідний момент.
back_populatesvsbackrefback_populates(рекомендовано в 2.0) вимагає явного визначенняrelationship()на обох моделях із взаємним посиланням одна на одну. Це забезпечує повну сумісність із type hints та автодоповненням IDE.backref(застарілий стиль) визначає зв'язок лише на одній моделі, а зворотний зв'язок на іншій створюється динамічно «магічним» шляхом. Це унеможливлює статичну перевірку типів через mypy та призводить до прихованих помилок.
Приклад зв'язку з оптимізованим каскадним видаленням та type safety:
class Project(Base):
__tablename__ = "projects"
id: Mapped[int] = mapped_column(primary_key=True)
# Каскадне видалення дочірніх завдань оптимізовано через passive_deletes
tasks: Mapped[list["Task"]] = relationship(
"Task",
back_populates="project",
cascade="all, delete-orphan",
passive_deletes=True, # SQLAlchemy не буде робити SELECT перед DELETE
)
class Task(Base):
__tablename__ = "tasks"
id: Mapped[int] = mapped_column(primary_key=True)
# ON DELETE CASCADE на рівні бази даних для роботи passive_deletes
project_id: Mapped[int] = mapped_column(
ForeignKey("projects.id", ondelete="CASCADE")
)
project: Mapped["Project"] = relationship(
"Project",
back_populates="tasks"
)
Lazy Loading vs Eager Loading та Проблема N+1
Одна з найбільш підступних проблем у роботі з ORM — це N+1 query problem (проблема N+1 запитів). Вона виникає через поведінку Lazy Loading і є однією з найпоширеніших причин повільної роботи вебдодатків.
Що таке Lazy Loading і чому він небезпечний?
Lazy Loading (ліниве завантаження) — це стратегія, при якій пов'язані об'єкти не завантажуються одразу разом із головним об'єктом. Натомість вони завантажуються «на вимогу» — лише тоді, коли ви вперше звертаєтеся до атрибута зв'язку.
У EF Core Lazy Loading увімкнено за замовчуванням і відбувається автоматично. У SQLAlchemy він також існує, але вимкнений за замовчуванням (особливо у async-режимі, де він взагалі не підтримується без спеціального налаштування).
Уявімо сценарій: отримати список 100 User-ів та для кожного — його username і кількість Post-ів.
З Lazy Loading (Погано ❌):
# 1 запит: SELECT * FROM users LIMIT 100
users = session.execute(select(User).limit(100)).scalars().all()
for user in users:
# +1 запит для КОЖНОГО користувача:
# SELECT * FROM posts WHERE author_id = <user.id>
print(f"{user.username}: {len(user.posts)} posts")
# Разом: 1 + 100 = 101 запит до бази даних!
# При 1000 користувачів — 1001 запит.
Це класична N+1 проблема: спочатку виконується 1 запит для отримання головних об'єктів, а потім для кожного з N об'єктів виконується додатковий запит для отримання пов'язаних даних.
Вирішення: Eager Loading через selectinload() та joinedload()
Eager Loading (жадібне завантаження) вирішує цю проблему, завантажуючи всі необхідні дані одним або двома SQL-запитами. SQLAlchemy надає кілька стратегій:
selectinload() — рекомендований підхід
Виконує два SQL-запити: перший — для головних об'єктів, другий — один SELECT ... WHERE author_id IN (1, 2, 3, ...) для всіх пов'язаних об'єктів одночасно.
from sqlalchemy.orm import selectinload
# Два запити замість N+1:
# 1: SELECT * FROM users LIMIT 100
# 2: SELECT * FROM posts WHERE author_id IN (1, 2, 3, ..., 100)
users = session.execute(
select(User)
.options(selectinload(User.posts))
.limit(100)
).scalars().all()
for user in users:
print(f"{user.username}: {len(user.posts)} posts")
# user.posts вже завантажені — жодного додаткового запиту!
joinedload() — для одиничних об'єктів
Виконує один SQL-запит з LEFT OUTER JOIN. Підходить для завантаження одного пов'язаного об'єкта (many-to-one або one-to-one), але може бути неефективним для колекцій:
from sqlalchemy.orm import joinedload
# Один запит з JOIN:
# SELECT posts.*, users.* FROM posts
# LEFT OUTER JOIN users ON users.id = posts.author_id
# WHERE posts.id = 42
post = session.execute(
select(Post)
.options(joinedload(Post.author))
.where(Post.id == 42)
).scalar_one()
print(f"Post by: {post.author.username}") # Жодного додаткового запиту
Порівняння: .Include() у EF Core ↔ selectinload() у SQLAlchemy
// EF Core: .Include() для завантаження Navigation Property
var users = await context.Users
.Include(u => u.Posts) // Аналог selectinload(User.posts)
.Take(100)
.ToListAsync();
foreach (var user in users)
{
Console.WriteLine($"{user.Username}: {user.Posts.Count} posts");
}
# SQLAlchemy: selectinload() для завантаження relationship
users = (await session.execute(
select(User)
.options(selectinload(User.posts))
.limit(100)
)).scalars().all()
for user in users:
print(f"{user.username}: {len(user.posts)} posts")
selectinload() для колекцій (one-to-many, many-to-many) та joinedload() для одиничних зв'язків (many-to-one, one-to-one). Увімкніть echo=True на Engine під час розробки, щоб бачити реальні SQL-запити та вчасно помічати N+1 проблеми.AsyncSession) Lazy Loading повністю не підтримується і призведе до MissingGreenlet-помилки. Це є корисним обмеженням: async-контекст змушує розробника явно вказувати стратегію завантаження через selectinload() або joinedload(), роблячи поведінку передбачуваною.Просунуті стратегії завантаження зв'язків
Окрім базових selectinload() та joinedload(), SQLAlchemy надає інструменти для вирішення більш складних архітектурних завдань при завантаженні даних:
- Вкладене завантаження (Nested Eager Loading)
Часто потрібно завантажити дерево зв'язків (наприклад, завантажити
Project, для нього — всіTask, а для кожного завдання — списокCommentта автора коментаряUser). Для цього використовується ланцюжок методівselectinload()абоjoinedload()через крапку:nested_loading.pyfrom sqlalchemy.orm import selectinload stmt = ( select(Project) .options( selectinload(Project.tasks) # Крок 1: Завантажити завдання для проекту .selectinload(Task.comments) # Крок 2: Завантажити коментарі для кожного завдання .joinedload(Comment.author) # Крок 3: Жадібно підтягнути автора коментаря (many-to-one) ) ) projects = (await session.execute(stmt)).scalars().all() selectinload()vssubqueryload()У старих версіях SQLAlchemy часто використовувалиsubqueryload().subqueryload()повторює вихіднийSELECT-запит у підзапиті (subquery) для отримання дочірніх об'єктів. Це може бути вкрай неефективно, якщо вихідний запит мав складніJOINабо великі значенняLIMIT/OFFSET(базі даних доводиться виконувати всю логіку вихідного запиту двічі).selectinload()замість цього бере первинні ключі (ID) вже завантажених батьківських об'єктів і робить простий, швидкий запитSELECT ... WHERE parent_id IN (1, 2, 3...). Це набагато легше для планувальника запитів PostgreSQL.- Рекомендація: У 2.0 повністю відмовтеся від
subqueryload()на користьselectinload().
contains_eager(): Завантаження зв'язків при ручному JOIN Уявіть ситуацію: вам потрібно відфільтрувати список проектів за статусом їхніх завдань (наприклад, знайти проекти, які мають хоча б одне критичне завдання), і водночас ви хочете, щоб ці завдання були одразу завантажені у властивістьproject.tasks. Якщо ви напишете.join(Project.tasks).options(selectinload(Project.tasks)), SQLAlchemy зробить два окремих JOIN: один для фільтрації, а другий (в окремому запиті) — для завантаження колекції. Це подвійна робота.
Правильне рішення —contains_eager(). Воно вказує SQLAlchemy: «Я вже зробив JOIN вручну для фільтрації, візьми ці дані з результату вихідного запиту та поклади у властивість зв'язку»:contains_eager_demo.pyfrom sqlalchemy.orm import contains_eager stmt = ( select(Project) .join(Project.tasks) # Ручний JOIN для фільтрації .where(Task.priority == TaskPriority.CRITICAL) # Умова фільтрації .options(contains_eager(Project.tasks)) # Завантажити ці ж дані в об'єкт ) # Виконається лише ОДИН SQL-запит із INNER JOIN, # і об'єкти project.tasks будуть повністю заповнені! projects = (await session.execute(stmt)).scalars().unique().all()
contains_eager() разом із join() на зв'язки типу "один-до-багатьох" (one-to-many) обов'язково викликайте метод .unique() на результаті виконання запиту перед .all(). Оскільки JOIN створює дублікати рядків для батьківської таблиці, .unique() гарантує, що SQLAlchemy згорне дублікати в унікальні об'єкти в пам'яті Python.Запити у SQLAlchemy 2.0: Core DML API
У SQLAlchemy 1.x робота з запитами на рівні Core (конструктор SQL) та на рівні ORM (робота з об'єктами) була розділена: Core використовував select(users_table), а ORM — session.query(User).
Починаючи з SQLAlchemy 2.0, цей бар'єр повністю стерто. Тепер для будь-яких запитів використовується єдиний уніфікований Core DML (Data Manipulation Language) API. Ми конструюємо запит як об'єкт абстрактного синтаксичного дерева (AST) за допомогою функцій select(), insert(), update() та delete(), а потім виконуємо його через сесію.
Компіляція та інспектування SQL-запитів
Коли ви викликаєте select(User), ви не робите запит до бази даних і навіть не створюєте SQL-рядок. Ви створюєте Python-об'єкт типу Select. SQLAlchemy компілює цей об'єкт у SQL безпосередньо перед виконанням.
Ви можете переглянути згенерований SQL у будь-який момент:
from sqlalchemy import select
from sqlalchemy.dialects import postgresql
stmt = select(User).where(User.username == "arakviel")
# 1. Швидкий перегляд загального SQL (буде використано базовий діалект)
print(stmt)
# Виведе: SELECT users.id, users.username, users.email, users.is_active, users.created_at FROM users WHERE users.username = :username_1
# 2. Перегляд SQL для конкретної СУБД (наприклад, PostgreSQL)
compiled = stmt.compile(dialect=postgresql.dialect())
print(compiled)
# Виведе те саме, але орієнтоване під синтаксис PG
# 3. Перегляд параметрів, які будуть передані з запитом
print(compiled.params)
# Виведе: {'username_1': 'arakviel'}
Завдяки параметризації (використанню плейсхолдерів типу :username_1), SQLAlchemy повністю захищає ваш додаток від SQL-ін'єкцій «з коробки».
Читання даних: select()
Функція select() будує SQL-запити вибірки (SELECT).
Сигнатура
def select(*entities: _ColumnsClauseArgument[Any]) -> Select
Аргументами entities можуть бути класи ORM-моделей (наприклад, User), окремі стовпці (User.username), або SQL-функції (func.count(User.id)).
Ключові методи об'єкта Select (Chaining API)
Кожен метод повертає новий об'єкт Select, що дозволяє будувати ланцюжки викликів:
WHERE до запиту. Приймає позиційні аргументи умов (*whereclauses). Кілька аргументів або послідовні виклики .where() об'єднуються через логічне AND.# Базовий: фільтрація за значенням (рівність або порівняння)
select(User).where(User.is_active == True)
# SQL: SELECT ... FROM users WHERE users.is_active = true
select(User).where(User.id > 10)
# SQL: SELECT ... FROM users WHERE users.id > 10
# Просунутий: комбінація AND, фільтрація NULL, пошук IN та LIKE
select(User).where(
User.is_active == True,
User.email.is_not(None),
User.id.in_([1, 5, 10]),
User.username.like("ara%")
)
# SQL: SELECT ... FROM users WHERE users.is_active = true AND users.email IS NOT NULL AND users.id IN (1, 5, 10) AND users.username LIKE 'ara%'
WHERE для порівняння на рівність за назвами полів за допомогою іменованих аргументів (**kwargs). Наприклад: .filter_by(is_active=True).# Базовий: фільтрація за одним полем
select(User).filter_by(is_active=True)
# SQL: SELECT ... FROM users WHERE users.is_active = true
# Просунутий: фільтрація за кількома полями одночасно (діє як AND)
select(User).filter_by(is_active=True, username="arakviel")
# SQL: SELECT ... FROM users WHERE users.is_active = true AND users.username = 'arakviel'
ORDER BY). Приймає стовпці для сортування (*clauses). Для зворотного сортування викликається метод .desc() на стовпці, наприклад User.created_at.desc().# Базовий: сортування за зростанням (за замовчуванням)
select(User).order_by(User.username)
# SQL: SELECT ... FROM users ORDER BY users.username ASC
# Просунутий: сортування за кількома полями, спадання (desc) та NULL в кінці
select(User).order_by(
User.is_active.desc(),
User.email.asc().nulls_last()
)
# SQL: SELECT ... FROM users ORDER BY users.is_active DESC, users.email ASC NULLS LAST
LIMIT). Приймає ціле число limit.# Базовий: обмеження кількості результатів (перші 10 записів)
select(User).limit(10)
# SQL: SELECT ... FROM users LIMIT 10
# Просунутий: ліміт у підзапиті для фільтрації пов'язаних таблиць
subq = select(User.id).order_by(User.created_at.desc()).limit(5).subquery()
select(Post).where(Post.author_id.in_(select(subq)))
# SQL: SELECT ... FROM posts WHERE posts.author_id IN (SELECT users.id FROM users ORDER BY users.created_at DESC LIMIT 5)
OFFSET). Приймає ціле число offset.# Базовий: пропуск перших 10 записів
select(User).offset(10)
# SQL: SELECT ... FROM users OFFSET 10
# Просунутий: використання разом з .limit() для класичної пагінації
page, limit = 3, 20
select(User).limit(limit).offset((page - 1) * limit)
# SQL: SELECT ... FROM users LIMIT 20 OFFSET 40
GROUP BY) за вказаними стовпцями (*clauses).# Базовий: групування постів за ID автора
select(Post.author_id, func.count(Post.id)).group_by(Post.author_id)
# SQL: SELECT posts.author_id, count(posts.id) FROM posts GROUP BY posts.author_id
# Просунутий: групування за кількома полями та виразами (екстракція року з дати)
select(
Post.author_id,
func.extract("year", Post.created_at),
func.count(Post.id)
).group_by(
Post.author_id,
func.extract("year", Post.created_at)
)
# SQL: SELECT posts.author_id, EXTRACT(year FROM posts.created_at), count(posts.id) FROM posts GROUP BY posts.author_id, EXTRACT(year FROM posts.created_at)
HAVING). Приймає умову фільтрації clause.# Базовий: фільтрація груп з кількістю елементів більше 5
select(Post.author_id).group_by(Post.author_id).having(func.count(Post.id) > 5)
# SQL: SELECT posts.author_id FROM posts GROUP BY posts.author_id HAVING count(posts.id) > 5
# Просунутий: фільтрація з перевіркою середньої довжини заголовків постів у групі
select(Post.author_id).group_by(Post.author_id).having(
func.avg(func.length(Post.title)) > 50
)
# SQL: SELECT posts.author_id FROM posts GROUP BY posts.author_id HAVING avg(length(posts.title)) > 50
INNER JOIN. Приймає цільову модель target та необов'язкову умову об'єднання onclause. Якщо зв'язок налаштовано через relationship(), SQLAlchemy автоматично виведе умову об'єднання.# Базовий: INNER JOIN через зв'язок (relationship)
select(User).join(User.posts)
# SQL: SELECT users.id, ... FROM users JOIN posts ON users.id = posts.author_id
# Просунутий: INNER JOIN за явною умовою ON та подвійний JOIN
select(User.username, Tag.name).join(
Post, User.id == Post.author_id
).join(
Post.tags
)
# SQL: SELECT users.username, tags.name FROM users JOIN posts ON users.id = posts.author_id JOIN post_tags ON posts.id = post_tags.post_id JOIN tags ON tags.id = post_tags.tag_id
LEFT OUTER JOIN (LEFT JOIN). Працює аналогічно до .join().# Базовий: LEFT JOIN користувачів та їхніх постів
select(User, Post).outerjoin(User.posts)
# SQL: SELECT users.id, ... posts.id, ... FROM users LEFT OUTER JOIN posts ON users.id = posts.author_id
# Просунутий: Anti-Join (знайти користувачів, у яких взагалі немає постів)
select(User).outerjoin(User.posts).where(Post.id.is_(None))
# SQL: SELECT users.id, ... FROM users LEFT OUTER JOIN posts ON users.id = posts.author_id WHERE posts.id IS NULL
Оператори та модифікатори стовпців (Column Operators & Modifiers)
Оскільки Python має власні зарезервовані ключові слова (наприклад, in, is, not, and, or), ми не можемо перевантажити їх безпосередньо у виразах на кшталт User.id in [1, 2]. Python-інтерпретатор просто видасть синтаксичну помилку.
Для обходу цього обмеження SQLAlchemy надає спеціальні методи об'єктів стовпців (Column Operators), які генерують відповідні SQL-конструкції:
IN. Перевіряє, чи належить значення стовпця до вказаного списку.- Коли використовувати: Для фільтрації за множиною значень (наприклад, статуси, списки ID).
- Коли НЕ використовувати: З великими списками (тисячі елементів), оскільки це сильно сповільнює розбір запиту базою даних. У таких випадках краще використовувати
JOINабо підзапит.
select(User).where(User.id.in_([1, 2, 3]))
# SQL: SELECT ... FROM users WHERE users.id IN (1, 2, 3)
IS та IS NOT. Найчастіше використовуються для роботи з NULL.- Коли використовувати: Для фільтрації
IS NULL(is_(None)) абоIS NOT NULL(is_not(None)). - Коли НЕ використовувати: Уникайте порівнянь
== Noneабо!= None. Хоча SQLAlchemy перевантажує оператори==та!=для генераціїIS NULL, використанняis_()таis_not()є стандартом PEP8 для порівняння зNoneта робить намір коду очевидним.
select(User).where(User.email.is_(None))
# SQL: SELECT ... FROM users WHERE users.email IS NULL
select(User).where(User.email.is_not(None))
# SQL: SELECT ... FROM users WHERE users.email IS NOT NULL
like() є регістрозалежним (SQL LIKE), тоді як ilike() — регістронезалежний (SQL ILIKE у PostgreSQL, або LOWER(col) LIKE LOWER(val) в інших СУБД).- Коли використовувати: Для пошуку підрядків за допомогою символу маски
%. - Коли НЕ використовувати: Для великих текстових полів (наприклад, статей) — замість цього краще використовувати повнотекстовий пошук (Full-Text Search) бази даних, оскільки
LIKE "%текст%"не використовує стандартні B-Tree індекси та виконує повне сканування таблиці (Full Table Scan).
select(User).where(User.username.like("ara%"))
# SQL: SELECT ... FROM users WHERE users.username LIKE 'ara%'
select(User).where(User.username.ilike("%doe%"))
# SQL: SELECT ... FROM users WHERE users.username ILIKE '%doe%'
like() / ilike(), які автоматично додають символ % у потрібні місця.- Коли використовувати: Для очевидного та простого текстового пошуку. За замовчуванням вони є регістрозалежними, але приймають параметр
autoescape=Trueабо можуть комбінуватися через.ilike()за потреби.
select(User).where(User.username.startswith("ara"))
# SQL: SELECT ... FROM users WHERE users.username LIKE 'ara%'
select(User).where(User.email.endswith(".com"))
# SQL: SELECT ... FROM users WHERE users.email LIKE '%.com'
select(User).where(User.username.contains("admin"))
# SQL: SELECT ... FROM users WHERE users.username LIKE '%admin%'
BETWEEN ... AND ....- Коли використовувати: Для фільтрації діапазонів чисел, дат або цін (включаючи межі).
- Коли НЕ використовувати: Для діапазонів дат, якщо ви не впевнені щодо часу (оскільки
BETWEENвключає кінцеву дату до 00:00:00, що може призвести до пропуску записів за останній день). Для дат часто надійніше використовувати явні порівняння>=та<.
select(User).where(User.id.between(10, 20))
# SQL: SELECT ... FROM users WHERE users.id BETWEEN 10 AND 20
.order_by().- Коли використовувати: Для явного контролю порядку сортування у звітах, списках та пагінації.
- Коли НЕ використовувати: Унікальні індекси вже відсортовані за зростанням за замовчуванням СУБД.
select(User).order_by(User.created_at.desc())
# SQL: SELECT ... FROM users ORDER BY users.created_at DESC
NULL) значення під час сортування — на початку чи в кінці результату. Працює як доповнення до desc() / asc().- Коли використовувати: Коли сортуєте за опціональними полями (наприклад, дата завершення завдання
due_date), щоб порожні значення не з'являлися на початку списку.
select(User).order_by(User.email.asc().nulls_last())
# SQL: SELECT ... FROM users ORDER BY users.email ASC NULLS LAST
AS).- Коли використовувати: При агрегаціях (
func.count(),func.sum()) або математичних операціях, щоб результат мав зручне ім'я у NamedTuple-рядках (Row.alias_name).
select(func.count(User.id).label("total")).scalar_one()
# SQL: SELECT count(users.id) AS total FROM users
CAST(col AS type)). Імпортується з головного модуля from sqlalchemy import cast.- Коли використовувати: Коли потрібно порівняти стовпець типу UUID з рядком, або перетворити ціле число на рядок для конкатенації чи форматування.
from sqlalchemy import cast, String
select(cast(User.id, String))
# SQL: SELECT CAST(users.id AS VARCHAR) FROM users
Як працює «магія» умов (Перевантаження операторів)
Для розробників, які вперше бачать код User.id > 10 у where(), це виглядає як магія. За логікою Python, вираз User.id > 10 мав би негайно обчислитися в логічне True або False (що передало б у where(True) або where(False) і зламало б запит).
Секрет криється в перевантаженні операторів (Operator Overloading) у Python:
- Стовпці — це не просто значення: Атрибут
User.idу вашому коді — це не ціле числоint. Це об'єкт класуInstrumentedAttribute(дескриптор, наданий SQLAlchemy). - Магічні методи: SQLAlchemy перевизначає стандартні магічні методи порівнянь Python на цих об'єктах:
__eq__(для==)__ne__(для!=)__gt__(для>)__lt__(для<)- тощо.
Коли ви пишете User.id > 10, Python під капотом викликає:
User.id.__gt__(10)
Замість повернення True або False, реалізація __gt__ в SQLAlchemy повертає об'єкт BinaryExpression (бінарний вираз).
Цей об'єкт є вузлом абстрактного синтаксичного дерева (AST) і містить інформацію:
- Ліва частина: посилання на стовпець
User.id - Оператор:
>(більше) - Права частина: константа
10
Коли ви передаєте цей BinaryExpression у метод .where(), SQLAlchemy додає його до списку фільтрів. Під час виконання запиту компилятор діалекту (наприклад, PostgreSQL) обходить це дерево і трансформує вузол у текстовий SQL: users.id > 10 (а точніше, у параметризований вигляд users.id > :id_1).
Зіставлення операторів Python та SQL у SQLAlchemy:
| Вираз у SQLAlchemy | Магічний метод Python | Результат у SQL |
|---|---|---|
User.username == "admin" | __eq__ | users.username = 'admin' |
User.id != 5 | __ne__ | users.id <> 5 |
User.id > 10 | __gt__ | users.id > 10 |
User.id >= 10 | __ge__ | users.id >= 10 |
User.id < 10 | __lt__ | users.id < 10 |
User.id <= 10 | __le__ | users.id <= 10 |
User.email == None | __eq__ з None | users.email IS NULL |
User.email != None | __ne__ з None | users.email IS NOT NULL |
Використання SQL-функцій: func
У багатьох SQL-запитах виникає потреба використовувати вбудовані функції СУБД: порахувати кількість рядків (COUNT), звести текст до нижнього регістру (LOWER), отримати поточний час (NOW()) або виконати складнішу агрегацію.
У SQLAlchemy для цього призначений спеціальний об'єкт func (імпортується як from sqlalchemy import func).
Як працює «динамічна магія» func?
На відміну від більшості бібліотек, у func немає жорстко прописаного списку методів у коді SQLAlchemy. Замість цього він використовує магічний метод Python __getattr__.
Коли ви звертаєтеся до будь-якого атрибута на func (наприклад, func.any_custom_function()), SQLAlchemy «на льоту» створює об'єкт функції з цим ім'ям. Це означає, що ви можете викликати будь-яку функцію, яку підтримує ваша конкретна база даних, навіть якщо розробники SQLAlchemy про неї не знали!
# Будь-яке ім'я динамічно трансформується в SQL-функцію:
stmt = select(func.my_custom_db_function(User.id))
# SQL: SELECT my_custom_db_function(users.id) FROM users
Найпопулярніші стандартні функції СУБД
COUNT(). Рахує кількість записів або не-NULL значень у стовпці.# Рахуємо загальну кількість користувачів
stmt = select(func.count(User.id))
# SQL: SELECT count(users.id) AS count_1 FROM users
server_default=func.now()).# Отримання поточного часу з серверу БД
stmt = select(func.now())
# SQL: SELECT now() AS now_1
COALESCE(...). Приймає кілька аргументів і повертає перший з них, який не є NULL.# Якщо bio порожнє (NULL), повернути рядок за замовчуванням
stmt = select(func.coalesce(User.bio, "Біографія відсутня"))
# SQL: SELECT coalesce(users.bio, 'Біографія відсутня') FROM users
ILIKE).# Порівняння email у нижньому регістрі
stmt = select(User).where(func.lower(User.email) == "user@example.com")
# SQL: SELECT ... WHERE lower(users.email) = 'user@example.com'
# Об'єднання імені та прізвища через пробіл
stmt = select(func.concat(User.first_name, " ", User.last_name))
# SQL: SELECT concat(users.first_name, ' ', users.last_name) FROM users
Специфічні функції СУБД (на прикладі PostgreSQL)
Оскільки func динамічний, ви можете викликати специфічні для PostgreSQL функції:
# date_trunc — округлення дати до дня (прибирає години/хвилини)
stmt = select(func.date_trunc("day", User.created_at))
# SQL: SELECT date_trunc('day', users.created_at) FROM users
# age — розрахунок віку на основі дати народження
stmt = select(func.age(User.birth_date))
# SQL: SELECT age(users.birth_date) FROM users
Явне зазначення типу повернення (type_)
Зазвичай SQLAlchemy намагається автоматично визначити тип даних, який поверне функція (наприклад, func.count() поверне ціле число Integer).
Проте для специфічних або кастомних СУБД-функцій SQLAlchemy не знає заздалегідь тип повернення. Через це вона не зможе виконати автоматичну конвертацію результату в об'єкти Python (наприклад, перетворити результат date_trunc у Python-об'єкт datetime.date).
Для вирішення цього використовується параметр type_:
from sqlalchemy import Date
# Явно вказуємо, що функція date_trunc повертає дату
stmt = select(func.date_trunc("day", User.created_at, type_=Date))
# Тепер результат буде автоматично розпарсено у datetime.date в Python:
day = session.execute(stmt).scalar_one() # поверне datetime.date(2026, 7, 4)
Виконання та отримання результатів
Для виконання запиту об'єкт Select передається в session.execute():
result = session.execute(stmt)
Метод execute() повертає об'єкт Result. Це ітератор, який повертає рядки типу Row (схожі на NamedTuple в Python). Оскільки при виборі повних ORM-моделей кожен рядок повертається як кортеж з одним елементом (наприклад, (User,)), SQLAlchemy надає допоміжні методи для розпакування цих даних:
Row (кортежів).Чому це потрібно?
База даних завжди повертає результати у вигляді таблиці (матриці). Навіть коли ви вибираєте всю модель select(User), SQLAlchemy отримує з бази набір рядків і загортає кожний результат у кортеж з одним елементом:- Без
.scalars()результат виконанняsession.execute(select(User)).all()виглядає так:[(User_1,), (User_2,), (User_3,)]— список кортежів довжиною 1. Щоб отримати користувача, довелося б писатиrow[0]. - З
.scalars()SQLAlchemy автоматично витягує перший елемент із кожного кортежу. Результат виконанняsession.execute(select(User)).scalars().all()виглядає так:[User_1, User_2, User_3]— чистий список об'єктів.
# Без scalars():
rows = session.execute(select(User)).all()
for row in rows:
user = row[0] # доводиться вручну брати перший елемент кортежу
print(user.username)
# З scalars():
users = session.execute(select(User)).scalars().all()
for user in users:
print(user.username) # працюємо безпосередньо з об'єктом User
.scalars(), повертає список чистих об'єктів (наприклад, list[User]).# 1. Без scalars() — список об'єктів Row ( NamedTuple )
rows = session.execute(select(User.id, User.username)).all()
# rows: list[Row(id=1, username="arakviel"), ...]
# 2. Зі scalars() — список чистих об'єктів User
users = session.execute(select(User)).scalars().all()
# users: list[User]
None, якщо результат порожній. Закриває з'єднання.# Отримання першого знайденого користувача або None
user = session.execute(
select(User).order_by(User.created_at.asc())
).scalars().first()
NoResultFound або MultipleResultsFound).# Очікуємо отримати рівно одного користувача за унікальним username
stmt = select(User).where(User.username == "admin")
admin = session.execute(stmt).scalars().one()
# Увага: якщо admin немає — кидає NoResultFound
None, якщо записів немає. Якщо знайдено більше 1 запису, кидає виняток MultipleResultsFound.# Безпечний пошук за унікальним email (поверне об'єкт або None)
stmt = select(User).where(User.email == "notfound@example.com")
user = session.execute(stmt).scalars().one_or_none()
.scalars().one(). Повертає значення першого стовпця першого рядка. Корисний для агрегацій типу COUNT.# Отримання кількості користувачів (повертає одне число int)
count = session.execute(
select(func.count(User.id))
).scalar_one()
.scalars().one_or_none(). Повертає одне скалярне значення або None.# Отримання email користувача за ID (поверне рядок або None, якщо ID не існує)
stmt = select(User.email).where(User.id == 999)
email = session.execute(stmt).scalar_one_or_none()
Практичні приклади з виводом результатів
Приклад 1: Отримання списку активних користувачів (ORM-стиль)
stmt = select(User).where(User.is_active == True).order_by(User.username.asc())
# Компіляція SQL:
# SELECT users.id, users.username, users.email, users.is_active, users.created_at
# FROM users
# WHERE users.is_active = true ORDER BY users.username ASC
users = session.execute(stmt).scalars().all()
for user in users:
print(f"User: {user.username} (ID: {user.id})")
Вивід у консоль:
User: alice (ID: 2)
User: arakviel (ID: 1)
User: bob (ID: 3)
Приклад 2: Вибірка конкретних стовпців (Tuple-стиль)
Коли нам потрібні лише окремі поля, завантажувати повні ORM-об'єкти неефективно (вони займають пам'ять та відстежуються сесією). Краще вибрати лише потрібні стовпці:
stmt = select(User.id, User.username).where(User.id > 1)
# Компіляція SQL:
# SELECT users.id, users.username FROM users WHERE users.id > 1
rows = session.execute(stmt).all()
# rows містить список об'єктів Row, які поводяться як кортежі та іменовані об'єкти
for row in rows:
# Доступ можливий двома шляхами:
print(f"ID: {row[0]}, Username: {row[1]}") # за індексом
print(f"ID: {row.id}, Username: {row.username}") # за атрибутом
Вивід у консоль:
ID: 2, Username: alice
ID: 3, Username: bob
Приклад 3: Складні логічні умови (AND, OR, NOT)
Умови AND поєднуються автоматично при передачі кількох аргументів у where(). Для OR та NOT використовуються спеціальні функції or_ та not_:
from sqlalchemy import or_, not_
stmt = (
select(User)
.where(
User.is_active == True,
or_(
User.email.like("%@gmail.com"),
not_(User.username == "admin")
)
)
)
# Компіляція SQL:
# SELECT users.id, users.username, users.email ...
# FROM users
# WHERE users.is_active = true AND (users.email LIKE '%@gmail.com' OR users.username != 'admin')
Приклад 4: INNER JOIN та агрегація (Групування)
Знайдемо кількість постів для кожного користувача, що має хоча б один пост, та відсортуємо за спаданням:
from sqlalchemy import func
stmt = (
select(User.username, func.count(Post.id).label("total_posts"))
.join(Post, User.id == Post.author_id)
.group_by(User.username)
.having(func.count(Post.id) > 0)
.order_by(func.count(Post.id).desc())
)
# Компіляція SQL:
# SELECT users.username, count(posts.id) AS total_posts
# FROM users JOIN posts ON users.id = posts.author_id
# GROUP BY users.username HAVING count(posts.id) > 0
# ORDER BY count(posts.id) DESC
results = session.execute(stmt).all()
for row in results:
print(f"{row.username}: {row.total_posts} posts")
Вивід у консоль:
arakviel: 12 posts
alice: 3 posts
Створення даних: insert()
Функція insert() генерує SQL-запити вставки записів (INSERT).
Сигнатура
def insert(table: _DMLTableArgument) -> Insert
Аргументом є ORM-клас моделі або об'єкт Table.
Методи об'єкта Insert
- **
.values(\*args, **kwargs)** — визначає дані для вставки. Може приймати словник, список словників (для масової вставки) або keyword-аргументи. .returning(*cols)— додає секціюRETURNINGдля отримання згенерованих базою даних значень (наприклад, ID або значень за замовчуванням) в результаті виконання запиту.
ORM vs Core Insertion
У SQLAlchemy є два шляхи створення записів:
ORM-стиль (Instance-based)
Ви створюєте екземпляр класу моделі, додаєте його до сесії через session.add() та робите коміт.
- Плюси: Об'єкт автоматично стає частиною Unit of Work, відстежуються його зв'язки, після вставки об'єкт готовий до подальшого використання.
- Мінуси: Повільний для великої кількості записів (створює накладні витрати на рівні Python-об'єктів).
Core-стиль (Bulk DML)
Ви викликаєте insert(Model).values(...) та виконуєте через session.execute().
- Плюси: Надзвичайно швидкий. Виконує пряму вставку сирих словників даних без створення об'єктів моделей в пам'яті.
- Мінуси: Не відстежує стан об'єктів у сесії, не синхронізує Identity Map автоматично.
Практичні приклади з виводом результатів
Приклад 1: Одиночна вставка з поверненням згенерованого ID
stmt = (
insert(User)
.values(username="new_user", email="new@example.com")
.returning(User.id, User.created_at)
)
# Компіляція SQL (PostgreSQL):
# INSERT INTO users (username, email) VALUES ('new_user', 'new@example.com')
# RETURNING users.id, users.created_at
result = session.execute(stmt)
row = result.fetchone() # Оскільки ми чекаємо один рядок з RETURNING
print(f"Inserted ID: {row.id}")
print(f"Created At: {row.created_at}")
session.commit()
Вивід у консоль:
Inserted ID: 42
Created At: 2026-07-04 18:00:00+00:00
Приклад 2: Масова вставка (Bulk Insert)
Для оптимізації продуктивності масової вставки передається список словників:
data = [
{"username": "user_a", "email": "a@example.com"},
{"username": "user_b", "email": "b@example.com"},
{"username": "user_c", "email": "c@example.com"},
]
# Створюємо стейтмент без виклику .values() — передамо дані під час виконання
stmt = insert(User)
# Компіляція SQL:
# INSERT INTO users (username, email) VALUES (:username, :email)
result = session.execute(stmt, data)
# SQLAlchemy автоматично групує це в один оптимізований батч-запит
print(f"Rows inserted: {result.rowcount}")
session.commit()
Вивід у консоль:
Rows inserted: 3
Оновлення даних: update()
Функція update() будує SQL-запити оновлення (UPDATE).
Сигнатура
def update(table: _DMLTableArgument) -> Update
Методи об'єкта Update
.where(*clauses)— обмежує вибірку рядків, які будуть оновлені (критично важливо, інакше оновляться всі рядки в таблиці!)..values(**kwargs)або **.values(dict)** — встановлює нові значення для стовпців..returning(*cols)— повертає оновлені значення.
ORM vs Core Update
- ORM-стиль: Отримуємо об'єкт через
session.get(User, id), міняємо його атрибут (user.email = "new@mail.com"), і робимоsession.commit(). SQLAlchemy автоматично згенеруєUPDATEпід час комміту завдяки Change Tracking (Unit of Work). - Core-стиль: Виконуємо
update(User).where(...).values(...). Застосовується для масових оновлень або коли потрібно оновити запис без попереднього завантаження його в пам'ять.
Практичні приклади з виводом результатів
Приклад 1: Масове оновлення з фільтрацією
Заблокуємо всіх користувачів, які не заходили на сайт дуже давно (наприклад, у яких пошта на старому домені):
stmt = (
update(User)
.where(User.email.like("%@old-domain.com"))
.values(is_active=False)
)
# Компіляція SQL:
# UPDATE users SET is_active = false WHERE users.email LIKE '%@old-domain.com'
result = session.execute(stmt)
print(f"Matched and updated rows: {result.rowcount}")
session.commit()
Вивід у консоль:
Matched and updated rows: 14
Приклад 2: Динамічне оновлення на основі існуючих значень стовпця
Оновимо лічильник переглядів постів, збільшивши його на 1 directly на рівні бази даних (без завантаження у Python):
stmt = (
update(Post)
.where(Post.id == 5)
.values(views_count=Post.views_count + 1)
.returning(Post.views_count)
)
# Компіляція SQL:
# UPDATE posts SET views_count = posts.views_count + 1 WHERE posts.id = 5 RETURNING posts.views_count
result = session.execute(stmt)
new_views = result.scalar_one()
print(f"Updated post views count. New value: {new_views}")
session.commit()
Вивід у консоль:
Updated post views count. New value: 101
Видалення даних: delete()
Функція delete() створює SQL-запити видалення записів (DELETE).
Сигнатура
def delete(table: _DMLTableArgument) -> Delete
Методи об'єкта Delete
.where(*clauses)— визначає умови фільтрації для видалення (якщо не вказати.where(), база даних очистить усю таблицю!)..returning(*cols)— повертає дані видалених рядків перед їх остаточним знищенням.
Практичні приклади з виводом результатів
Приклад 1: Видалення неактивних користувачів
stmt = (
delete(User)
.where(User.is_active == False)
)
# Компіляція SQL:
# DELETE FROM users WHERE users.is_active = false
result = session.execute(stmt)
print(f"Deleted users count: {result.rowcount}")
session.commit()
Вивід у консоль:
Deleted users count: 5
Приклад 2: Видалення з поверненням видалених даних (RETURNING)
Корисно, якщо перед видаленням нам потрібно залогувати або зберегти видалені дані:
stmt = (
delete(Post)
.where(Post.created_at < datetime(2020, 1, 1))
.returning(Post.id, Post.title)
)
# Компіляція SQL:
# DELETE FROM posts WHERE posts.created_at < '2020-01-01 00:00:00'
# RETURNING posts.id, posts.title
deleted_posts = session.execute(stmt).all()
for post in deleted_posts:
print(f"Logged deleted post: [ID: {post.id}] {post.title}")
session.commit()
Порівняння: LINQ (EF Core) ↔ SQLAlchemy Query API
// Фільтрація + сортування + пагінація
var users = await context.Users
.Where(u => u.IsActive && u.Username.StartsWith("ara"))
.OrderByDescending(u => u.CreatedAt)
.Skip((page - 1) * perPage)
.Take(perPage)
.Include(u => u.Posts)
.ToListAsync();
// Агрегація
var postCount = await context.Posts
.Where(p => p.AuthorId == userId)
.CountAsync();
# Фільтрація + сортування + пагінація
users = (await session.execute(
select(User)
.where(User.is_active == True, User.username.like("ara%"))
.order_by(User.created_at.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.options(selectinload(User.posts))
)).scalars().all()
# Агрегація
from sqlalchemy import func
post_count = (await session.execute(
select(func.count(Post.id))
.where(Post.author_id == user_id)
)).scalar_one()
Просунуті операції з даними в Session
Для ефективного написання репозиторіїв та оптимізації роботи з базою даних важливо розрізняти тонкощі поведінки внутрішніх методів Session:
session.get() проти select()
session.get(Model, pk_value)шукає об'єкт за його первинним ключем. Його головна перевага — використання Identity Map. Перед тим як генерувати SQL-запит і відправляти його в базу даних, SQLAlchemy перевіряє, чи цей об'єкт уже завантажений у поточну сесію. Якщо так — він одразу повертає об'єкт із пам'яті Python, повністю уникаючи мережевого запиту до БД.- Конструкція
select(Model).where(Model.id == pk_value)завжди генерує та виконуєSELECT-запит до бази даних, ігноруючи те, що об'єкт уже може бути завантажений. - Рекомендація: Для пошуку за ID завжди віддавайте перевагу
session.get()— це суттєво заощаджує ресурси бази даних.
session.add() проти session.merge()
session.add(obj)призначений для додавання нового (transient) об'єкта до сесії. Якщо ви спробуєте додати об'єкт, який має первинний ключ, що вже існує в базі даних, SQLAlchemy може видати помилку при збереженні (через обмеження унікальності PK).session.merge(obj)використовується для роботи з від'єднаними (detached) об'єктами (наприклад, об'єкт прийшов із кешу Redis або був десеріалізований, і він уже маєid, але поточна сесія про него нічого не знає).- Метод
merge()робить наступне:- Перевіряє Identity Map та базу даних на наявність об'єкта з таким самим ID.
- Якщо об'єкт знайдено у БД, він завантажує його в сесію та копіює стан вашого від'єднаного об'єкта в цей завантажений екземпляр.
- Якщо об'єкта немає в БД, він створює новий об'єкт.
- Повертає новий відстежуваний екземпляр. Важливо пам'ятати: сам переданий в
merge(obj)об'єкт залишається невідстежуваним (detached), роботу слід продовжувати з об'єктом, який повернув метод:tracked_obj = await session.merge(obj).
flush() проти commit() проти refresh()
session.flush()— відправляє накопичені в пам'яті операції (INSERT, UPDATE, DELETE) до бази даних у межах поточної транзакції. СУБД виконує ці команди та повертає згенеровані значення (як-от автоінкрементніid), але ці зміни можна відкотити (rollback()). Вони не стають видимими іншим підключенням до завершення транзакції.session.commit()— фіксує поточну транзакцію бази даних, роблячи всі зміни перманентними та видимими для всіх. Під капотомcommit()завжди автоматично викликаєflush()перед збереженням.session.refresh(obj)— виконує примусовийSELECT-запит до бази даних, щоб оновити стан конкретного об'єктаobj(наприклад, для отримання значень, розрахованих на стороні бази даних). Також це анулює локальний кеш Identity Map для даного об'єкта.
Вкладені транзакції: begin_nested() (Savepoints)
- У багатьох реляційних СУБД (зокрема, PostgreSQL) будь-яка помилка під час транзакції (наприклад, спроба вставити дублікат унікального ключа) робить усю транзакцію недійсною. Подальші запити у ній будуть викидати помилку
InFailedSqlTransactionError. - Якщо ви імпортуєте дані у циклі, один некоректний запис за замовчуванням завалить весь імпорт.
- Для обходу цього використовується
session.begin_nested(), який створює SQLSAVEPOINT. Якщо операція всередині вкладеної транзакції падає з помилкою, вона відкочується лише до цієї контрольної точки, дозволяючи продовжити роботу з основною транзакцією.
# Масовий імпорт з ігноруванням помилкових записів
for user_data in users_to_import:
# begin_nested() створює Savepoint і автоматично відкочує його у разі винятку
async with session.begin_nested():
try:
new_user = User(**user_data)
session.add(new_user)
await session.flush() # Спроба вставки конкретного запису
except Exception as e:
# Цей запис відкотиться, але сесія залишиться робочою
print(f"Помилка імпорту {user_data['username']}: {e}")
continue
await session.commit() # Зберігаємо всі успішно імпортовані записи
execute() проти scalars() та розпаковка кортежів
- Виклик
session.execute(stmt)повертає об'єкт класуResult. За своєю суттю це ітератор над рядками результату, де кожен рядок є об'єктомRow(поводиться якNamedTuple). - Якщо ви робите
select(User), коженRowмістить кортеж із одного елемента:(UserObject,). - Метод
.scalars()перетворює цей результат, витягуючи перший елемент із кожного кортежу. Тобтоexecute(select(User)).scalars().all()повертаєlist[User]. - Якщо ви робите мульти-вибір (наприклад,
select(User, Post)абоselect(User.username, Post.title)), вам потрібен повнийexecute(), і ви можете розпакувати кортежі в циклі:
result = await session.execute(select(User, Post).join(User.posts))
for user, post in result: # Автоматична розпаковка кортежу Row
print(f"Користувач {user.username} написав пост {post.title}")
Схематичне порівняння методів Session:
| Метод | Об'єкт до виклику | Стан після виклику | Чи робить запит до БД? |
|---|---|---|---|
session.add(obj) | Transient (новий) | Persistent (відстежується) | Ні (запит буде при flush/commit) |
session.get(Model, id) | Не існує у сесії | Persistent (завантажений) | Лише якщо об'єкта немає в Identity Map |
session.merge(obj) | Detached (ззовні) | Detached (оригінал), повернутий копіюється у Persistent | Так, робить SELECT для перевірки стану в БД |
session.delete(obj) | Persistent | Pending Delete (видалення при commit) | Ні (запит буде при flush/commit) |
session.flush() | Persistent/Dirty | Persistent (зміни відправлені до БД) | Так, виконує накопичені операції запису |
session.commit() | Persistent/Dirty | Persistent (або Expired / Detached при закритті) | Так, фіксує поточну транзакцію |
session.refresh(obj) | Persistent | Persistent (дані оновлено з БД) | Так, виконує SELECT для оновлення полів |
Під капотом: Unit of Work, Identity Map та Change Tracking
Щоб дійсно розуміти SQLAlchemy і передбачати її поведінку, недостатньо просто знати синтаксис. Потрібно зрозуміти три ключові патерни, що лежать в основі Session: Unit of Work, Identity Map та Change Tracking.
Патерн Unit of Work (Одиниця роботи)
Unit of Work — це архітектурний патерн (описаний Мартіном Фаулером у книзі «Patterns of Enterprise Application Architecture»), який відстежує всі зміни, що відбуваються з об'єктами під час однієї «одиниці роботи» (зазвичай — одного HTTP-запиту), і потім атомарно відправляє їх до бази даних у вигляді однієї транзакції.
Аналогія: уявіть, що Session — це кошик для покупок. Ви кладете товари (об'єкти) у кошик, берете звідти, змінюєте вміст — і лише коли ви підходите до каси (session.commit()), всі операції виконуються разом як єдина атомарна дія.
Session відстежує об'єкти через чотири можливі стани:
- Transient (перехідний) — об'єкт створений (
User()), але ще не доданий до сесії. - Pending (очікуваний) — об'єкт доданий через
session.add(), але ще не збережений до БД (INSERTще не виконано). - Persistent (постійний) — об'єкт має відповідний запис у БД та відстежується сесією.
- Detached (від'єднаний) — об'єкт був persistent, але сесія закрита або він був видалений зі сесії.
Identity Map: Гарантія унікальності
Identity Map (карта ідентичностей) — це внутрішній словник у Session, що відображає первинний ключ бази даних на Python-об'єкт.
Ключова гарантія Identity Map: у межах однієї сесії session.get(User, 1) завжди повертає той самий Python-об'єкт, незалежно від кількості викликів.
with Session(engine) as session:
# Перший виклик: SELECT * FROM users WHERE id = 1
user_a = session.get(User, 1)
# Другий виклик: SQL-запиту НЕ виконується!
# SQLAlchemy знаходить об'єкт у Identity Map і повертає той самий
user_b = session.get(User, 1)
print(user_a is user_b) # True — це буквально один і той самий об'єкт!
# Якщо ми змінимо user_a — user_b теж «зміниться»
user_a.is_active = False
print(user_b.is_active) # False
Це принципово відрізняється від простого кешування запитів. Identity Map гарантує, що ваша програма завжди працює з консистентним станом об'єктів протягом однієї транзакції.
Порівняння з EF Core: DbContext.ChangeTracker є прямим аналогом Identity Map + Change Tracking у SQLAlchemy. Якщо ви двічі запитуєте один і той самий запис через context.Users.Find(1) в EF Core — ви також отримуєте той самий об'єкт з кешу.
Change Tracking: Як SQLAlchemy «знає», що змінилося
Change Tracking (відстеження змін) — це механізм, завдяки якому Session автоматично знає, які саме атрибути об'єктів були змінені і генерує мінімально необхідний UPDATE-запит.
Цей механізм реалізований через Python-дескриптори (instrumented attributes) — при реєстрації моделі SQLAlchemy замінює звичайні атрибути класу (username, email тощо) на спеціальні об'єкти-дескриптори, що перехоплюють операції __get__ і __set__.
from sqlalchemy import inspect
with Session(engine) as session:
user = session.get(User, 1) # Завантажуємо об'єкт
# Змінюємо кілька атрибутів:
user.username = "new_username"
user.bio = "Updated bio"
# Можна перевірити стан об'єкта ПЕРЕД commit():
inspector = inspect(user)
for attr in inspector.attrs:
history = attr.history
if history.has_changes():
print(f" {attr.key}: {history.deleted} → {history.added}")
# Виведе:
# username: ['old_username'] → ['new_username']
# bio: [None] → ['Updated bio']
session.commit()
# SQLAlchemy генерує МІНІМАЛЬНИЙ запит:
# UPDATE users SET username='new_username', bio='Updated bio' WHERE id=1
# (лише змінені поля, не всі!)
Саме завдяки Change Tracking SQLAlchemy генерує ефективні UPDATE-запити — оновлюються лише ті стовпці, що дійсно змінилися, а не всі поля об'єкта.
Інтеграція з FastAPI: get_async_session через Dependency Injection
Тепер об'єднаємо все, що ми вивчили, у повноцінний production-ready файл конфігурації бази даних для FastAPI-додатку. Ключовим питанням є: як правильно управляти життєвим циклом AsyncSession — коли її створювати і коли закривати?
Відповідь стандартна для FastAPI: через Dependency Injection (детально розібраний у статті 20).
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
from sqlalchemy.orm import DeclarativeBase
# 1. Базовий клас для всіх моделей
class Base(DeclarativeBase):
pass
# 2. Async Engine — singleton, живе весь час роботи додатку
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/taskforge_db"
async_engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_recycle=1800,
echo=False,
)
# 3. Фабрика AsyncSession
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
expire_on_commit=False,
)
# 4. Dependency для FastAPI — генератор сесії
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI Dependency: надає AsyncSession для одного HTTP-запиту.
Сесія автоматично закривається після завершення запиту.
У разі помилки — автоматичний rollback.
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Тепер використаємо цей dependency у FastAPI-роутері:
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_async_session
from app.models.user import User
# Схеми Pydantic для користувачів
from app.schemas.user import UserCreate, UserRead
router = APIRouter(prefix="/users", tags=["Users"])
# Зручний alias для типізованого параметра залежності
SessionDep = Annotated[AsyncSession, Depends(get_async_session)]
@router.get("/{user_id}", response_model=UserRead)
async def get_user(user_id: int, session: SessionDep):
"""Отримати користувача за ID."""
user = await session.get(User, user_id)
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id={user_id} not found"
)
return user
@router.get("/", response_model=list[UserRead])
async def list_users(
session: SessionDep,
skip: int = 0,
limit: int = 20,
):
"""Отримати список користувачів з пагінацією."""
users = (await session.execute(
select(User)
.offset(skip)
.limit(limit)
.order_by(User.created_at.desc())
)).scalars().all()
return users
@router.post("/", response_model=UserRead, status_code=status.HTTP_201_CREATED)
async def create_user(user_in: UserCreate, session: SessionDep):
"""Створити нового користувача."""
# Перевірка унікальності username
existing = (await session.execute(
select(User).where(User.username == user_in.username)
)).scalar_one_or_none()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Username '{user_in.username}' already taken"
)
user = User(**user_in.model_dump())
session.add(user)
await session.flush() # Виконуємо INSERT, щоб отримати згенерований id
return user
await session.flush() перед поверненням user. Виклик flush() відправляє INSERT до PostgreSQL у межах поточної транзакції (але не комітить її), що дозволяє отримати згенерований автоінкрементний id. Сам commit() відбудеться автоматично у dependency-генераторі get_async_session() при успішному завершенні запиту.Ініціалізація таблиць при старті додатку
Для розробки та тестування зручно автоматично створювати всі таблиці при старті FastAPI:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.core.database import async_engine, Base
# Імпортуємо всі моделі, щоб Base.metadata їх «знала»
from app.models import user, post, project # noqa: F401
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle-хук FastAPI: виконується при старті та зупинці."""
# Старт: створюємо всі таблиці (лише для розробки!)
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# Зупинка: закриваємо пул підключень
await async_engine.dispose()
app = FastAPI(title="TaskForge API", lifespan=lifespan)
Base.metadata.create_all() — зручний інструмент для локальної розробки та тестів, але він категорично не підходить для production. Він лише створює таблиці, що не існують, але не вміє змінювати існуючі (додавати стовпці, змінювати типи, видаляти поля). Для управління схемою бази даних у production використовується Alembic — інструмент міграцій, якому присвячена наступна стаття 23.Практичний приклад від А до Я: Самодостатній скрипт
Для того щоб ви могли швидко запустити та протестувати всі концепції async SQLAlchemy 2.0 в одному місці без потреби встановлювати PostgreSQL чи конфігурувати FastAPI, нижче наведено повністю самодостатній, працездатний асинхронний скрипт.
Він використовує вбудовану в Python базу даних SQLite в оперативній пам'яті (:memory:), створює таблиці, виконує CRUD-операції, оптимізує завантаження зв'язків та виводить результати у консоль.
Для запуску вам знадобляться встановлені бібліотеки sqlalchemy та асинхронний драйвер aiosqlite:
pip install sqlalchemy aiosqlite
main.py
import asyncio
from datetime import datetime
from sqlalchemy import String, ForeignKey, select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, selectinload
# 1. Створюємо базовий клас
class Base(DeclarativeBase):
pass
# 2. Визначаємо моделі (зв'язок Один до Багатьох: User -> Post)
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(100), unique=True)
posts: Mapped[list["Post"]] = relationship(
back_populates="author", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User id={self.id} username={self.username!r}>"
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(String)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
def __repr__(self) -> str:
return f"<Post id={self.id} title={self.title!r}>"
# 3. Налаштовуємо Async Engine та Session local фабрику
# Використовуємо SQLite в пам'яті з асинхронним драйвером aiosqlite
DATABASE_URL = "sqlite+aiosqlite:///:memory:"
async_engine = create_async_engine(DATABASE_URL, echo=True) # echo=True покаже згенерований SQL
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, class_=AsyncSession, expire_on_commit=False
)
async def main():
# 4. Створюємо таблиці у базі даних
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSessionLocal() as session:
print("\n--- 1. Створення записів (INSERT) ---")
# Створюємо користувача та його пости
user = User(
username="arakviel",
email="arakviel@example.com",
posts=[
Post(title="Перший async пост", content="Вміст першого асинхронного посту"),
Post(title="Другий async пост", content="Вміст другого асинхронного посту")
]
)
session.add(user)
await session.commit()
print(f"Збережено користувача: {user} (ID: {user.id})")
print("\n--- 2. Читання записів з Eager Loading (SELECT з selectinload) ---")
# selectinload(User.posts) запобігає N+1 проблемі для завантаження списку постів
stmt = select(User).where(User.username == "arakviel").options(selectinload(User.posts))
db_user = (await session.execute(stmt)).scalars().one()
print(f"Знайдено користувача: {db_user.username}")
for post in db_user.posts:
print(f" - Пост: {post.title}")
print("\n--- 3. Оновлення запису (UPDATE) ---")
# Оновимо заголовок першого посту
first_post = db_user.posts[0]
first_post.title = "Оновлена назва async посту"
await session.commit()
print(f"Нова назва посту в БД: {first_post.title}")
print("\n--- 4. Видалення запису (DELETE) ---")
# Видалення користувача призведе до каскадного видалення всіх його постів
await session.delete(db_user)
await session.commit()
# Перевіряємо, чи залишились пости у базі
posts_stmt = select(Post)
remaining_posts = (await session.execute(posts_stmt)).scalars().all()
print(f"Пости в базі даних після видалення користувача: {remaining_posts}")
# Закриваємо з'єднання з БД
await async_engine.dispose()
if __name__ == "__main__":
asyncio.run(main())
Практика: Будуємо шар моделей для TaskForge
На основі всього вивченого побудуємо повноцінний шар моделей для проєкту TaskForge. Усі моделі розміщуватимуться у директорії models/ і успадковуватимуться від спільного Base.
Структура проєкту
Крок 4: Фіксація змін у репозиторії
Збережіть усі внесені зміни та виконайте фіксацію у вашому сховищі Git:
git add .
git commit -m "feat: add PostgreSQL with SQLAlchemy 2.0 async models"
Практичні вправи
Виконайте завдання для перевірки та закріплення матеріалу статті.
Вправа 1: Отримання об'єкта та перевірка на None (Базовий рівень)
Завдання: Напишіть асинхронний роут GET /api/v1/projects/{project_id}, який:
- Приймає
project_id: intтаsession: SessionDep. - Використовує асинхронний метод
session.get(Project, project_id). - Якщо проєкт не знайдено в базі даних (метод повернув
None), викидаєHTTPException(status_code=404, detail="Project not found"). - Якщо проєкт знайдено, повертає його.
Вправа 2: Каскадне створення сутностей в одній транзакції (Середній рівень)
Завдання: Напишіть асинхронний роут POST /api/v1/projects, який створює новий проєкт та автоматично робить його творця учасником проєкту:
- Приймає
project_in: ProjectCreateтаsession: SessionDep. - Припускається, що
owner_idдорівнює1(для спрощення, поки немає аутентифікації). - Створює об'єкт
Projectта додає його до сесії. - Створює об'єкт
ProjectMemberіз роллюMemberRole.OWNERдля цього проєкту та користувача зid=1. - Зберігає обидві сутності в базу даних за допомогою
await session.commit().
Вправа 3: Агрегація та оптимізація N+1 запитів (Професійний рівень)
Завдання: Напишіть асинхронний запит для отримання списку користувачів разом із кількістю їхніх завдань:
- Напишіть запит
select(User, func.count(Task.id))із використаннямouterjoin(User.tasks)таgroup_by(User.id). - Виконайте запит у сесії та отримайте результати.
- Переконайтеся за допомогою логів (
echo=True), що виконується лише один SQL-запит до бази даних.
Підсумок
У цій статті ми пройшли від перших принципів до побудови повноцінного шару моделей для FastAPI-додатку. Виокремимо ключові ідеї:
Дворівнева архітектура
select()-based API.Новий Python-first синтаксис
DeclarativeBase, Mapped[T] і mapped_column() дають повну type-safety. Mapped[int] — NOT NULL, Mapped[int | None] — NULLABLE. IDE тепер розуміє типи атрибутів моделей.Engine та Session
Engine управляє пулом підключень (singleton). Session / AsyncSession — короткоживучий «workspace» для роботи з об'єктами. expire_on_commit=False — обов'язково для async.N+1 проблема та Eager Loading
selectinload() для колекцій і joinedload() для одиничних зв'язків. Увімкніть echo=True для аудиту запитів.Unit of Work під капотом
Session відстежує об'єкти (Transient → Pending → Persistent → Detached), гарантує унікальність через Identity Map і генерує мінімальні UPDATE-запити через Change Tracking.Інтеграція з FastAPI
get_async_session() — dependency-генератор: один HTTP-запит = одна AsyncSession. Автоматичний commit() при успіху та rollback() при помилці. create_all() — лише для dev/тестів.