Python

Threading — конкурентність для I/O-bound задач

Вичерпний розбір модуля threading у Python — від Thread і daemon-потоків до Race Condition, Lock, RLock, Semaphore, Event, Barrier, потокобезпечних черг та ThreadPoolExecutor. Реальні патерни та антипатерни з прикладами.

Threading — конкурентність для I/O-bound задач

Проблема: веб-скрапер, що чекає цілу вічність

Уявіть, що вам потрібно зібрати дані з 50 веб-сторінок. Кожен запит займає в середньому 1.5 секунди. Послідовний код завершиться за 75 секунд. З 10 потоками — за 7–8 секунд. Різниця в 10 разів без жодної зміни логіки обробки даних.

# Послідовний підхід — чекаємо кожен запит
import time
import urllib.request

urls = [f"https://example.com/page/{i}" for i in range(50)]

start = time.perf_counter()
for url in urls:
    with urllib.request.urlopen(url) as r:
        data = r.read()    # чекаємо... чекаємо... чекаємо...
print(f"Готово за {time.perf_counter() - start:.1f}s")  # ~75s 😩

Поки перший запит чекає відповіді від сервера, процесор нічого не робить. Це марнотратство — і саме тут threading вирішує проблему.

threading — правильний вибір для I/O-bound задач: мережеві запити, звернення до бази даних, читання файлів. Для CPU-bound задач (математика, обробка зображень) використовуйте multiprocessing — через GIL потоки не дадуть прискорення на обчислювальному коді.

Частина I: threading.Thread — основи

Створення та запуск потоку

Модуль threading надає клас Thread. Є два способи його використання: передати функцію через аргумент target або успадкуватись і перевизначити метод run().

# thread_basics.py
import threading
import time


def worker(name: str, delay: float) -> None:
    """Функція, що виконується у потоці."""
    print(f"[{name}] Починаю роботу (затримка {delay}s)")
    time.sleep(delay)  # Симулюємо I/O-очікування
    print(f"[{name}] Завершено")


# ── Спосіб 1: передаємо target-функцію ───────────────────────────────────────
t1 = threading.Thread(target=worker, args=("Потік-А", 1.0))
t2 = threading.Thread(target=worker, args=("Потік-Б", 1.5))

# start() запускає потік — повертається негайно (не чекає завершення)
t1.start()
t2.start()

print("Основний потік продовжує виконання...")

# join() чекає завершення потоку перед тим, як продовжити
t1.join()
t2.join()
print("Обидва потоки завершено")


# ── Спосіб 2: успадкування від Thread ────────────────────────────────────────
class DataFetcher(threading.Thread):
    """Потік, що завантажує дані і зберігає результат у self.result."""

    def __init__(self, url: str):
        super().__init__(daemon=True)  # daemon=True — про це далі
        self.url = url
        self.result: bytes | None = None
        self.error: Exception | None = None

    def run(self) -> None:
        """Перевизначаємо run() — тут виконується логіка потоку."""
        import urllib.request
        try:
            with urllib.request.urlopen(self.url, timeout=10) as r:
                self.result = r.read()
        except Exception as e:
            self.error = e


fetcher = DataFetcher("https://httpbin.org/get")
fetcher.start()
fetcher.join(timeout=15)  # чекаємо не більше 15 секунд

if fetcher.result:
    print(f"Отримано {len(fetcher.result)} байт")
elif fetcher.error:
    print(f"Помилка: {fetcher.error}")
python thread_basics.py
$ python thread_basics.py
[Потік-А] Починаю роботу (затримка 1.0s)
[Потік-Б] Починаю роботу (затримка 1.5s)
Основний потік продовжує виконання...
[Потік-А] Завершено
[Потік-Б] Завершено
Обидва потоки завершено

Параметри Thread

target
Callable | None
Функція, що викликається у потоці. Якщо перевизначаєте run() у підкласі — не потрібний.
args
tuple = ()
Позиційні аргументи для target. Обов'язково кортеж — навіть для одного аргументу: args=(value,).
kwargs
dict = {}
Іменовані аргументи для target.
name
str | None
Ім'я потоку (для налагодження). Якщо не задано — генерується автоматично (Thread-1, Thread-2, ...).
daemon
bool | None
Якщо True — потік є демоном. Daemon-потоки автоматично завершуються, коли завершується головний потік програми, не чекаючи виконання join(). Ідеально для фонових завдань (моніторинг, логування), що не повинні утримувати програму від завершення.

join() і is_alive()

import threading
import time


def slow_task(duration: float) -> None:
    time.sleep(duration)


t = threading.Thread(target=slow_task, args=(3.0,))
t.start()

# is_alive() → True, поки потік виконується
print(f"Потік живий: {t.is_alive()}")    # True

# join(timeout) — чекає не більше timeout секунд
t.join(timeout=1.0)
print(f"Після join(1.0): {t.is_alive()}")  # True — потік ще не завершив 3s

t.join()  # чекаємо до кінця
print(f"Після join():    {t.is_alive()}")  # False — завершено

Daemon-потоки: фонові завдання

# daemon_thread.py
import threading
import time


def heartbeat() -> None:
    """Фоновий моніторинг — відправляє 'пульс' кожну секунду."""
    while True:
        print(f"  [♥] Heartbeat: {time.strftime('%H:%M:%S')}")
        time.sleep(1)


# Daemon-потік: завершиться автоматично разом з main
monitor = threading.Thread(target=heartbeat, daemon=True)
monitor.start()

# Основна програма
print("Програма стартувала")
time.sleep(3.5)
print("Програма завершується — daemon-потік теж зупиниться")
# join() НЕ потрібен для daemon — він зупиниться автоматично
python daemon_thread.py
$ python daemon_thread.py
Програма стартувала
[♥] Heartbeat: 14:11:01
[♥] Heartbeat: 14:11:02
[♥] Heartbeat: 14:11:03
Програма завершується — daemon-потік теж зупиниться

Частина II: Race Condition — стан гонки

Чому поділ стану між потоками небезпечний

Усі потоки в одному процесі поділяють одну пам'ять. Це зручно (легко передавати дані), але небезпечно: кілька потоків можуть одночасно читати і змінювати один об'єкт. Результат — недетермінований і залежить від порядку перемикання потоків, який визначає операційна система.

Класичний приклад: лічильник транзакцій у банківській системі.

# race_condition.py
import threading

# Спільний стан — банківський рахунок
balance = 0


def deposit(amount: int, times: int) -> None:
    global balance
    for _ in range(times):
        # УВАГА: це НЕ атомарна операція!
        # Під капотом: READ balance → ADD amount → WRITE balance
        # Між READ і WRITE GIL може переключити потік!
        balance += amount


def withdraw(amount: int, times: int) -> None:
    global balance
    for _ in range(times):
        balance -= amount


# Запускаємо два потоки: один кладе, інший знімає по 1000 разів
ITERATIONS = 100_000

t1 = threading.Thread(target=deposit, args=(1, ITERATIONS))
t2 = threading.Thread(target=withdraw, args=(1, ITERATIONS))

t1.start()
t2.start()
t1.join()
t2.join()

# Очікуємо 0 (поклали і зняли однакову суму)
# Але отримуємо щось інше кожного разу!
print(f"Баланс: {balance}")  # -3421? 7892? Завжди різне!
python race_condition.py (запускаємо 3 рази)
$ python race_condition.py
Баланс: -3421
$ python race_condition.py
Баланс: 7892
$ python race_condition.py
Баланс: -156

Чому так? Операція balance += 1 насправді складається з трьох байткод-інструкцій:

  1. LOAD_GLOBAL balance — прочитати поточне значення
  2. LOAD_CONST 1 — завантажити 1
  3. BINARY_ADD — скласти
  4. STORE_GLOBAL balance — записати результат

GIL може перемкнути потік між будь-якими двома інструкціями. Якщо обидва потоки прочитали balance = 100, перший записав 101, а другий теж записав 101 (замість 100) — одне додавання втрачено назавжди.


Частина III: Примітиви синхронізації

Lock — базовий мютекс

Lock (mutual exclusion lock) гарантує, що тільки один потік може виконувати захищений блок коду в момент часу. Потік, що входить у захищений блок, захоплює (acquire) замок. Інші потоки, що намагаються захопити вже захоплений замок, блокуються і чекають, поки перший потік звільнить (release) його.

# lock_demo.py
import threading

balance = 0
lock = threading.Lock()  # Один замок — один захищений ресурс


def deposit(amount: int, times: int) -> None:
    global balance
    for _ in range(times):
        with lock:        # acquire() при вході, release() при виході
            balance += amount  # тепер атомарна секція


def withdraw(amount: int, times: int) -> None:
    global balance
    for _ in range(times):
        with lock:
            balance -= amount


ITERATIONS = 100_000
t1 = threading.Thread(target=deposit, args=(1, ITERATIONS))
t2 = threading.Thread(target=withdraw, args=(1, ITERATIONS))
t1.start(); t2.start()
t1.join(); t2.join()

print(f"Баланс: {balance}")  # Завжди 0 ✅
Завжди використовуйте with lock: замість явних lock.acquire() / lock.release(). Конструкція with гарантує звільнення замку навіть при виключенні всередині блоку. Явні виклики легко забути або пропустити при рефакторингу.

RLock — реєнтерабельний замок

Звичайний Lock не можна захопити двічі з одного потоку — потік заблокує сам себе (deadlock). RLock (reentrant lock) вирішує цю проблему: один потік може захопити його кілька разів, і замок звільняється лише після відповідної кількості release().

# rlock_demo.py
import threading

rlock = threading.RLock()


def outer_operation() -> None:
    with rlock:                   # Захоплюємо RLock (count=1)
        print("Зовнішня операція")
        inner_operation()         # Викликаємо функцію, що теж захоплює
        print("Зовнішня завершена")


def inner_operation() -> None:
    with rlock:                   # Той самий потік захоплює вдруге (count=2)
        print("  Внутрішня операція")
    # Звільняємо: count → 1
# Звільняємо outer_operation: count → 0 → замок реально звільнено


# З звичайним Lock тут був би deadlock!
t = threading.Thread(target=outer_operation)
t.start()
t.join()
RLock типовий у рекурсивних алгоритмах або у класах, де публічний метод викликає інший публічний метод, і обидва захищені одним замком. Якщо ваш код не рекурсивний — використовуйте звичайний Lock (він ефективніший).

Semaphore — обмеження конкурентності

Semaphore — замок з лічильником. На відміну від Lock (дозволяє лише 1 потік), Semaphore(N) дозволяє N потокам одночасно виконувати захищену секцію. Ідеально для обмеження навантаження на зовнішні ресурси.

# semaphore_demo.py
import threading
import time
import urllib.request
from typing import Any


# Обмежуємо одночасні HTTP-запити до 3 (щоб не перевантажити сервер)
MAX_CONCURRENT = 3
semaphore = threading.Semaphore(MAX_CONCURRENT)
results: list[tuple[str, int]] = []
results_lock = threading.Lock()


def fetch_with_limit(url: str) -> None:
    with semaphore:  # Блокує, якщо вже 3 потоки виконують запит
        print(f"  → Запит: {url[-20:]}")
        try:
            with urllib.request.urlopen(url, timeout=10) as r:
                data = r.read()
            with results_lock:
                results.append((url, len(data)))
        except Exception as e:
            print(f"  ✗ Помилка: {e}")
        print(f"  ← Завершено: {url[-20:]}")


urls = [f"https://httpbin.org/delay/0.5?id={i}" for i in range(10)]
threads = [threading.Thread(target=fetch_with_limit, args=(u,)) for u in urls]

start = time.perf_counter()
for t in threads: t.start()
for t in threads: t.join()
elapsed = time.perf_counter() - start

print(f"\nОброблено {len(results)} URL за {elapsed:.2f}s")
print(f"(Без обмеження було б ~0.5s, з Semaphore(3) — ~{0.5 * (len(urls) // MAX_CONCURRENT):.1f}s)")

Event — сигнал між потоками

Event — простий механізм сигналізації. Один потік встановлює подію (set()), інші чекають на неї (wait()). Після set() усі потоки, що чекали, розблоковуються.

# event_demo.py
import threading
import time


# Сценарій: сервер ініціалізується, клієнти чекають готовності
ready_event = threading.Event()


def server_startup() -> None:
    print("[Сервер] Ініціалізація...")
    time.sleep(2)  # довга ініціалізація
    print("[Сервер] Готовий до з'єднань!")
    ready_event.set()  # Сигналізуємо всім клієнтам


def client(client_id: int) -> None:
    print(f"[Клієнт-{client_id}] Чекаю готовності сервера...")
    ready_event.wait()  # Блокується до set()
    print(f"[Клієнт-{client_id}] З'єднуюсь!")


# Запускаємо сервер і 5 клієнтів
server = threading.Thread(target=server_startup)
clients = [threading.Thread(target=client, args=(i,)) for i in range(1, 6)]

for c in clients: c.start()
server.start()

server.join()
for c in clients: c.join()
python event_demo.py
$ python event_demo.py
[Клієнт-1] Чекаю готовності сервера...
[Клієнт-2] Чекаю готовності сервера...
[Клієнт-3] Чекаю готовності сервера...
[Клієнт-4] Чекаю готовності сервера...
[Клієнт-5] Чекаю готовності сервера...
[Сервер] Ініціалізація...
[Сервер] Готовий до з'єднань!
[Клієнт-1] З'єднуюсь!
[Клієнт-2] З'єднуюсь!
[Клієнт-3] З'єднуюсь!
[Клієнт-4] З'єднуюсь!
[Клієнт-5] З'єднуюсь!

Condition — очікування умови

Condition — просунутий примітив для сценаріїв producer-consumer, де один потік чекає, поки інший підготує дані. Об'єднує Lock і Event в один зручний API.

# condition_demo.py
import threading
import time
from collections import deque


class BoundedBuffer:
    """Кільцевий буфер з обмеженою місткістю."""

    def __init__(self, capacity: int):
        self.capacity = capacity
        self.buffer: deque = deque()
        self.condition = threading.Condition()

    def put(self, item: int) -> None:
        with self.condition:
            # Чекаємо, поки є місце
            while len(self.buffer) >= self.capacity:
                print(f"  [Producer] Буфер повний ({self.capacity}), чекаю...")
                self.condition.wait()
            self.buffer.append(item)
            print(f"  [Producer] Поклав {item!r}. У буфері: {len(self.buffer)}")
            self.condition.notify_all()  # Сповіщаємо Consumer(s)

    def get(self) -> int:
        with self.condition:
            # Чекаємо, поки є що брати
            while not self.buffer:
                print(f"  [Consumer] Буфер порожній, чекаю...")
                self.condition.wait()
            item = self.buffer.popleft()
            print(f"  [Consumer] Взяв {item!r}. У буфері: {len(self.buffer)}")
            self.condition.notify_all()  # Сповіщаємо Producer(s)
            return item


buf = BoundedBuffer(capacity=3)


def producer():
    for i in range(8):
        buf.put(i)
        time.sleep(0.1)


def consumer():
    for _ in range(8):
        item = buf.get()
        time.sleep(0.3)  # Consumer повільніший за Producer


t_prod = threading.Thread(target=producer)
t_cons = threading.Thread(target=consumer)
t_prod.start(); t_cons.start()
t_prod.join();  t_cons.join()

Barrier — точка зустрічі потоків

Barrier(N) блокує потоки, поки не зберуться всі N. Потім усі одночасно продовжують виконання. Корисний для фазових обчислень, де всі потоки мають завершити поточний крок перед початком наступного.

# barrier_demo.py
import threading
import time
import random


def worker(worker_id: int, barrier: threading.Barrier) -> None:
    # Фаза 1: підготовка (різний час у кожного)
    prep_time = random.uniform(0.5, 2.0)
    print(f"[Worker-{worker_id}] Підготовка ({prep_time:.1f}s)...")
    time.sleep(prep_time)
    print(f"[Worker-{worker_id}] Готовий, чекаю інших...")

    barrier.wait()  # Чекаємо всіх учасників
    print(f"[Worker-{worker_id}] Всі готові! Починаємо фазу 2")


NUM_WORKERS = 4
barrier = threading.Barrier(NUM_WORKERS)

threads = [threading.Thread(target=worker, args=(i, barrier))
           for i in range(NUM_WORKERS)]
for t in threads: t.start()
for t in threads: t.join()
python barrier_demo.py
$ python barrier_demo.py
[Worker-0] Підготовка (0.6s)...
[Worker-1] Підготовка (1.3s)...
[Worker-2] Підготовка (0.9s)...
[Worker-3] Підготовка (1.8s)...
[Worker-0] Готовий, чекаю інших...
[Worker-2] Готовий, чекаю інших...
[Worker-1] Готовий, чекаю інших...
[Worker-3] Готовий, чекаю інших...
[Worker-0] Всі готові! Починаємо фазу 2
[Worker-1] Всі готові! Починаємо фазу 2
[Worker-2] Всі готові! Починаємо фазу 2
[Worker-3] Всі готові! Починаємо фазу 2

Частина IV: Потокобезпечні черги — queue.Queue

Чому queue.Queue є кращим за список зі Lock

Модуль queue надає потокобезпечні черги, що реалізують внутрішню синхронізацію. Замість того, щоб вручну захищати список Lock-ом, можна використовувати Queue — вона вже потокобезпечна «з коробки» і додає підтримку блокуючих операцій.

# queue_producer_consumer.py
import queue
import threading
import time
from typing import Any

# Sentinel (сигнальний об'єкт) — позначає кінець роботи
STOP_SIGNAL = object()


def producer(q: queue.Queue, items: list[Any]) -> None:
    """Виробник: кладе завдання у чергу."""
    for item in items:
        q.put(item)           # Блокується, якщо черга повна (maxsize)
        print(f"[Producer] → {item}")
        time.sleep(0.1)

    q.put(STOP_SIGNAL)        # Сигналізуємо кінець
    print("[Producer] Усі завдання передані")


def consumer(q: queue.Queue, worker_id: int) -> None:
    """Споживач: бере і обробляє завдання."""
    while True:
        item = q.get()        # Блокується, якщо черга порожня
        if item is STOP_SIGNAL:
            q.put(STOP_SIGNAL)  # Передаємо сигнал наступному consumer
            break
        try:
            print(f"  [Consumer-{worker_id}] Обробляю {item}")
            time.sleep(0.3)   # Симулюємо обробку
        finally:
            q.task_done()     # Повідомляємо queue про завершення обробки


# Черга з обмеженим розміром (backpressure)
task_queue: queue.Queue = queue.Queue(maxsize=5)

prod = threading.Thread(target=producer, args=(task_queue, list(range(10))))
consumers = [
    threading.Thread(target=consumer, args=(task_queue, i))
    for i in range(3)
]

prod.start()
for c in consumers: c.start()
prod.join()
for c in consumers: c.join()

# Альтернатива: q.join() чекає, поки всі task_done() будуть викликані

queue.Queue vs queue.LifoQueue vs queue.PriorityQueue

import queue

# FIFO (First In, First Out) — стандартна черга
fifo: queue.Queue[int] = queue.Queue()
fifo.put(1); fifo.put(2); fifo.put(3)
print(fifo.get(), fifo.get(), fifo.get())  # 1 2 3

# LIFO (Last In, First Out) — стек
lifo: queue.LifoQueue[int] = queue.LifoQueue()
lifo.put(1); lifo.put(2); lifo.put(3)
print(lifo.get(), lifo.get(), lifo.get())  # 3 2 1

# Пріоритетна черга — менше число = вищий пріоритет
pq: queue.PriorityQueue[tuple[int, str]] = queue.PriorityQueue()
pq.put((3, "низький пріоритет"))
pq.put((1, "критичний"))
pq.put((2, "середній"))
print(pq.get(), pq.get(), pq.get())  # (1, 'критичний') (2, 'середній') (3, 'низький...')

Частина V: ThreadPoolExecutor — пул потоків

Навіщо пул потоків

Створення потоку — це системний виклик з певними накладними витратами. Якщо потрібно виконати 10 000 коротких завдань — створювати по потоку для кожного неефективно. Пул потоків вирішує це: N потоків (workers) існують весь час і по черзі беруть завдання з черги.

ThreadPoolExecutor з модуля concurrent.futures — сучасний, зручний і типобезпечний спосіб працювати з пулом потоків.

submit() і Future

# thread_pool_submit.py
from concurrent.futures import ThreadPoolExecutor, Future
import time
import urllib.request


def fetch(url: str) -> tuple[str, int]:
    """Повертає (url, розмір відповіді)."""
    with urllib.request.urlopen(url, timeout=10) as r:
        return url, len(r.read())


urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/headers",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
]

with ThreadPoolExecutor(max_workers=4) as pool:
    # submit() негайно повертає Future — не чекає результату!
    futures: list[Future[tuple[str, int]]] = [
        pool.submit(fetch, url) for url in urls
    ]

    # Результати доступні через future.result() — блокується до завершення
    for future in futures:
        url, size = future.result()
        print(f"  {url}: {size} байт")

# Після виходу з блоку with — pool.shutdown(wait=True) викликається автоматично

map() — паралельний аналог вбудованого map()

# thread_pool_map.py
from concurrent.futures import ThreadPoolExecutor
import time


def process_item(item: int) -> int:
    """CPU-проста задача для демонстрації."""
    time.sleep(0.1)  # Симулюємо I/O
    return item ** 2


items = list(range(20))

start = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as pool:
    # map() повертає ітератор результатів у тому ж порядку, що й вхідні дані
    results = list(pool.map(process_item, items))
elapsed = time.perf_counter() - start

print(f"Результати: {results}")
print(f"Час: {elapsed:.2f}s  (послідовно було б ~2.0s)")

Обробка виключень у Future

# future_exceptions.py
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
import random


def unreliable_task(task_id: int) -> str:
    """Задача, що іноді кидає виключення."""
    if random.random() < 0.3:  # 30% шанс помилки
        raise ValueError(f"Задача {task_id} зазнала збою!")
    return f"Задача {task_id} успішна"


with ThreadPoolExecutor(max_workers=5) as pool:
    futures = {pool.submit(unreliable_task, i): i for i in range(10)}

    # as_completed() повертає futures в порядку ЗАВЕРШЕННЯ (не запуску!)
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result()  # Кидає виключення, якщо задача провалилась
            print(f"  ✓ {result}")
        except ValueError as e:
            print(f"  ✗ Задача {task_id}: {e}")
as_completed() — зручний спосіб обробляти результати в міру їхньої готовності, а не чекати найповільнішого. Корисний у сценаріях де перші результати потрібні якомога швидше (наприклад, пошук у кількох джерелах).

Частина VI: threading.local() — ізоляція стану між потоками

Іноді потрібно зберігати різний стан для кожного потоку — наприклад, з'єднання з базою даних або контекст поточного запиту. threading.local() створює об'єкт, у якому кожен потік бачить власну копію атрибутів.

# thread_local_demo.py
import threading
import time


# thread-local storage — кожен потік має власний контекст
_ctx = threading.local()


def set_request_context(user_id: int, request_id: str) -> None:
    """Встановлює контекст поточного запиту для цього потоку."""
    _ctx.user_id = user_id
    _ctx.request_id = request_id


def get_current_user() -> int | None:
    """Повертає user_id поточного потоку (або None якщо не встановлено)."""
    return getattr(_ctx, "user_id", None)


def handle_request(user_id: int, request_id: str) -> None:
    """Симулює обробку HTTP-запиту у потоці."""
    set_request_context(user_id, request_id)
    time.sleep(0.1)  # Симулюємо роботу

    # Будь-яка функція у цьому потоці може отримати контекст
    user = get_current_user()
    print(f"[{request_id}] Обробляю запит користувача {user}")
    time.sleep(0.1)
    print(f"[{request_id}] Завершено для user={user}")


threads = [
    threading.Thread(target=handle_request, args=(uid, f"REQ-{uid:03d}"))
    for uid in range(1, 6)
]
for t in threads: t.start()
for t in threads: t.join()
python thread_local_demo.py
$ python thread_local_demo.py
[REQ-001] Обробляю запит користувача 1
[REQ-002] Обробляю запит користувача 2
[REQ-003] Обробляю запит користувача 3
[REQ-001] Завершено для user=1
[REQ-004] Обробляю запит користувача 4
[REQ-002] Завершено для user=2

Кожен потік бачить свій user_id, хоча всі звертаються до одного об'єкта _ctx. Flask використовує threading.local() (через werkzeug.local.Local) саме для цього — щоб flask.g, flask.request та flask.session були різними для різних потоків-запитів.


Частина VII: Deadlock — взаємне блокування

Що таке deadlock і як він виникає

Deadlock — ситуація, коли два або більше потоки заблоковані назавжди, бо кожен чекає ресурс, що захоплений іншим. Класичний приклад — «проблема обідаючих філософів»:

# deadlock_demo.py
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()


def thread_1() -> None:
    print("[T1] Захоплюю lock_a...")
    with lock_a:
        print("[T1] lock_a захоплено. Чекаю lock_b...")
        time.sleep(0.1)  # дозволяємо T2 захопити lock_b
        with lock_b:     # T1 чекає lock_b, але він захоплений T2
            print("[T1] lock_b захоплено. Готово!")


def thread_2() -> None:
    print("[T2] Захоплюю lock_b...")
    with lock_b:
        print("[T2] lock_b захоплено. Чекаю lock_a...")
        time.sleep(0.1)
        with lock_a:     # T2 чекає lock_a, але він захоплений T1
            print("[T2] lock_a захоплено. Готово!")


# DEADLOCK: T1 тримає lock_a і чекає lock_b
#            T2 тримає lock_b і чекає lock_a
#            Обидва чекають вічно!
t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)
t1.start(); t2.start()
# t1.join(); t2.join()  # ← ніколи не завершиться!

Стратегії уникнення deadlock

Стратегія 1: завжди захоплювати замки в одному порядку

# deadlock_fix_1.py — послідовний порядок захоплення
def thread_1_safe() -> None:
    with lock_a:       # ← завжди A, потім B
        with lock_b:
            print("[T1] Виконуємо роботу")


def thread_2_safe() -> None:
    with lock_a:       # ← і тут теж A, потім B (не B, потім A!)
        with lock_b:
            print("[T2] Виконуємо роботу")

Стратегія 2: lock.acquire(timeout=...)

# deadlock_fix_2.py — timeout при захопленні замку
import threading

lock_a = threading.Lock()
lock_b = threading.Lock()


def safe_thread() -> None:
    acquired_a = lock_a.acquire(timeout=1.0)
    if not acquired_a:
        print("Не вдалось захопити lock_a, повторимо пізніше")
        return
    try:
        acquired_b = lock_b.acquire(timeout=1.0)
        if not acquired_b:
            print("Не вдалось захопити lock_b, відпускаємо lock_a")
            return
        try:
            print("Обидва замки захоплено, виконуємо роботу")
        finally:
            lock_b.release()
    finally:
        lock_a.release()

Стратегія 3: замінити Lock на queue.Queue — найкраща стратегія. Якщо замість спільного стану з замками використовувати передачу повідомлень через чергу, deadlock стає структурно неможливим.


Частина VIII: Реальний патерн — паралельний завантажувач

Об'єднаємо все вивчене в production-ready паралельний завантажувач файлів з обмеженням конкурентності, обробкою помилок і прогрес-баром:

# parallel_downloader.py
from __future__ import annotations

import threading
import time
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Callable


@dataclass
class DownloadResult:
    url: str
    success: bool
    size_bytes: int = 0
    error: str = ""
    elapsed: float = 0.0


@dataclass
class DownloadStats:
    total: int = 0
    completed: int = 0
    failed: int = 0
    total_bytes: int = 0
    _lock: threading.Lock = field(default_factory=threading.Lock, repr=False)

    def update(self, result: DownloadResult) -> None:
        with self._lock:
            self.completed += 1
            if result.success:
                self.total_bytes += result.size_bytes
            else:
                self.failed += 1


def download_url(
    url: str,
    timeout: int = 15,
    on_progress: Callable[[str, int], None] | None = None,
) -> DownloadResult:
    """Завантажує URL і повертає результат."""
    start = time.perf_counter()
    try:
        with urllib.request.urlopen(url, timeout=timeout) as response:
            data = response.read()
        elapsed = time.perf_counter() - start
        result = DownloadResult(
            url=url, success=True,
            size_bytes=len(data), elapsed=elapsed,
        )
        if on_progress:
            on_progress(url, len(data))
        return result
    except Exception as e:
        elapsed = time.perf_counter() - start
        return DownloadResult(url=url, success=False, error=str(e), elapsed=elapsed)


def parallel_download(
    urls: list[str],
    max_workers: int = 10,
    timeout: int = 15,
    verbose: bool = True,
) -> list[DownloadResult]:
    """
    Паралельне завантаження списку URL.

    Args:
        urls: список URL для завантаження
        max_workers: максимальна кількість паралельних потоків
        timeout: таймаут на один запит у секундах
        verbose: виводити прогрес у консоль
    """
    stats = DownloadStats(total=len(urls))
    results: list[DownloadResult] = []
    results_lock = threading.Lock()

    def progress_callback(url: str, size: int) -> None:
        if verbose:
            short_url = url.split("?")[0][-40:]
            print(f"  ✓ {short_url} ({size:,} байт)")

    start_total = time.perf_counter()
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        future_to_url = {
            pool.submit(download_url, url, timeout, progress_callback): url
            for url in urls
        }

        for future in as_completed(future_to_url):
            result = future.result()
            stats.update(result)
            with results_lock:
                results.append(result)

            if verbose and not result.success:
                print(f"  ✗ ПОМИЛКА {result.url[-40:]}: {result.error}")

    total_elapsed = time.perf_counter() - start_total

    if verbose:
        print(f"\n{'─'*50}")
        print(f"Завершено: {stats.completed}/{stats.total}")
        print(f"Успішно:   {stats.completed - stats.failed}")
        print(f"Помилок:   {stats.failed}")
        print(f"Отримано:  {stats.total_bytes:,} байт")
        print(f"Час:       {total_elapsed:.2f}s")
        avg_sequential = sum(r.elapsed for r in results)
        print(f"Прискорення: {avg_sequential / total_elapsed:.1f}x")

    return results


# ─── Демонстрація ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
    test_urls = [
        f"https://httpbin.org/delay/0.3?n={i}" for i in range(12)
    ]

    print(f"Завантажуємо {len(test_urls)} URL (по ~0.3s кожен)\n")
    results = parallel_download(test_urls, max_workers=6)
python parallel_downloader.py
$ python parallel_downloader.py
Завантажуємо 12 URL (по ~0.3s кожен)
✓ httpbin.org/delay/0.3?n=2 (220 байт)
✓ httpbin.org/delay/0.3?n=0 (220 байт)
✓ httpbin.org/delay/0.3?n=4 (220 байт)
✓ httpbin.org/delay/0.3?n=1 (220 байт)
✓ httpbin.org/delay/0.3?n=3 (220 байт)
✓ httpbin.org/delay/0.3?n=5 (220 байт)
✓ httpbin.org/delay/0.3?n=6 (220 байт)
... (решта)
──────────────────────────────────────────────────
Завершено: 12/12
Успішно: 12
Помилок: 0
Отримано: 2,640 байт
Час: 0.68s
Прискорення: 5.3x

Підсумок: ключові принципи threading

Lock — захист спільного стану

Будь-який спільний змінюваний стан між потоками потребує синхронізації. Завжди використовуйте with lock: — це гарантує звільнення навіть при виключенні.

Queue — передача замість поділу

Замість спільного стану зі складною синхронізацією — передавайте повідомлення через queue.Queue. Це спрощує код і унеможливлює deadlock.

ThreadPoolExecutor — не ThreadPoolExecutor

Для більшості задач використовуйте ThreadPoolExecutor, а не threading.Thread вручну. Він управляє пулом, обробляє виключення через Future, надає as_completed().

daemon=True для фонових задач

Потоки-монітори, heartbeat, log-flusher — оголошуйте daemon. Вони не утримують програму від завершення.

Таблиця примітивів синхронізації

ПримітивВикористанняКлючові методи
LockЗахист спільного ресурсу (один потік)acquire(), release(), with lock:
RLockРекурсивний/реєнтерабельний доступТі самі, що Lock
Semaphore(N)Обмеження N потоків одночасноacquire(), release()
EventСигнал між потоками (one-shot або reset)set(), clear(), wait(), is_set()
ConditionProducer-consumer, wait for conditionwait(), notify(), notify_all()
Barrier(N)Синхронізація N потоків у точціwait()
queue.QueueПотокобезпечний обмін данимиput(), get(), task_done(), join()
threading.local()Ізольований стан per-threadЗвичайне присвоєння атрибутів

Антипатерни, яких слід уникати

АнтипатернЧому небезпечнийРішення
Спільний список без LockRace conditionLock або queue.Queue
lock.acquire() без try/finallyВитік замку при виключенніwith lock:
Захоплення замків у різному порядкуDeadlockФіксований порядок або Queue
Занадто багато потоків (>100)Накладні витрати ОСThreadPoolExecutor(max_workers=N)
time.sleep() всередині with lock:Тримаємо замок надто довгоМінімізуйте критичну секцію
Non-daemon потоки, про які «забули»Програма не завершуєтьсяjoin() або daemon=True
Copyright © 2026