Threading — конкурентність для I/O-bound задач
Threading — конкурентність для I/O-bound задач
Проблема: веб-скрапер, що чекає цілу вічність
Уявіть, що вам потрібно зібрати дані з 50 веб-сторінок. Кожен запит займає в середньому 1.5 секунди. Послідовний код завершиться за 75 секунд. З 10 потоками — за 7–8 секунд. Різниця в 10 разів без жодної зміни логіки обробки даних.
# Послідовний підхід — чекаємо кожен запит
import time
import urllib.request
urls = [f"https://example.com/page/{i}" for i in range(50)]
start = time.perf_counter()
for url in urls:
with urllib.request.urlopen(url) as r:
data = r.read() # чекаємо... чекаємо... чекаємо...
print(f"Готово за {time.perf_counter() - start:.1f}s") # ~75s 😩
Поки перший запит чекає відповіді від сервера, процесор нічого не робить. Це марнотратство — і саме тут threading вирішує проблему.
threading — правильний вибір для I/O-bound задач: мережеві запити, звернення до бази даних, читання файлів. Для CPU-bound задач (математика, обробка зображень) використовуйте multiprocessing — через GIL потоки не дадуть прискорення на обчислювальному коді.Частина I: threading.Thread — основи
Створення та запуск потоку
Модуль threading надає клас Thread. Є два способи його використання: передати функцію через аргумент target або успадкуватись і перевизначити метод run().
# thread_basics.py
import threading
import time
def worker(name: str, delay: float) -> None:
"""Функція, що виконується у потоці."""
print(f"[{name}] Починаю роботу (затримка {delay}s)")
time.sleep(delay) # Симулюємо I/O-очікування
print(f"[{name}] Завершено")
# ── Спосіб 1: передаємо target-функцію ───────────────────────────────────────
t1 = threading.Thread(target=worker, args=("Потік-А", 1.0))
t2 = threading.Thread(target=worker, args=("Потік-Б", 1.5))
# start() запускає потік — повертається негайно (не чекає завершення)
t1.start()
t2.start()
print("Основний потік продовжує виконання...")
# join() чекає завершення потоку перед тим, як продовжити
t1.join()
t2.join()
print("Обидва потоки завершено")
# ── Спосіб 2: успадкування від Thread ────────────────────────────────────────
class DataFetcher(threading.Thread):
"""Потік, що завантажує дані і зберігає результат у self.result."""
def __init__(self, url: str):
super().__init__(daemon=True) # daemon=True — про це далі
self.url = url
self.result: bytes | None = None
self.error: Exception | None = None
def run(self) -> None:
"""Перевизначаємо run() — тут виконується логіка потоку."""
import urllib.request
try:
with urllib.request.urlopen(self.url, timeout=10) as r:
self.result = r.read()
except Exception as e:
self.error = e
fetcher = DataFetcher("https://httpbin.org/get")
fetcher.start()
fetcher.join(timeout=15) # чекаємо не більше 15 секунд
if fetcher.result:
print(f"Отримано {len(fetcher.result)} байт")
elif fetcher.error:
print(f"Помилка: {fetcher.error}")
Параметри Thread
run() у підкласі — не потрібний.target. Обов'язково кортеж — навіть для одного аргументу: args=(value,).target.Thread-1, Thread-2, ...).True — потік є демоном. Daemon-потоки автоматично завершуються, коли завершується головний потік програми, не чекаючи виконання join(). Ідеально для фонових завдань (моніторинг, логування), що не повинні утримувати програму від завершення.join() і is_alive()
import threading
import time
def slow_task(duration: float) -> None:
time.sleep(duration)
t = threading.Thread(target=slow_task, args=(3.0,))
t.start()
# is_alive() → True, поки потік виконується
print(f"Потік живий: {t.is_alive()}") # True
# join(timeout) — чекає не більше timeout секунд
t.join(timeout=1.0)
print(f"Після join(1.0): {t.is_alive()}") # True — потік ще не завершив 3s
t.join() # чекаємо до кінця
print(f"Після join(): {t.is_alive()}") # False — завершено
Daemon-потоки: фонові завдання
# daemon_thread.py
import threading
import time
def heartbeat() -> None:
"""Фоновий моніторинг — відправляє 'пульс' кожну секунду."""
while True:
print(f" [♥] Heartbeat: {time.strftime('%H:%M:%S')}")
time.sleep(1)
# Daemon-потік: завершиться автоматично разом з main
monitor = threading.Thread(target=heartbeat, daemon=True)
monitor.start()
# Основна програма
print("Програма стартувала")
time.sleep(3.5)
print("Програма завершується — daemon-потік теж зупиниться")
# join() НЕ потрібен для daemon — він зупиниться автоматично
Частина II: Race Condition — стан гонки
Чому поділ стану між потоками небезпечний
Усі потоки в одному процесі поділяють одну пам'ять. Це зручно (легко передавати дані), але небезпечно: кілька потоків можуть одночасно читати і змінювати один об'єкт. Результат — недетермінований і залежить від порядку перемикання потоків, який визначає операційна система.
Класичний приклад: лічильник транзакцій у банківській системі.
# race_condition.py
import threading
# Спільний стан — банківський рахунок
balance = 0
def deposit(amount: int, times: int) -> None:
global balance
for _ in range(times):
# УВАГА: це НЕ атомарна операція!
# Під капотом: READ balance → ADD amount → WRITE balance
# Між READ і WRITE GIL може переключити потік!
balance += amount
def withdraw(amount: int, times: int) -> None:
global balance
for _ in range(times):
balance -= amount
# Запускаємо два потоки: один кладе, інший знімає по 1000 разів
ITERATIONS = 100_000
t1 = threading.Thread(target=deposit, args=(1, ITERATIONS))
t2 = threading.Thread(target=withdraw, args=(1, ITERATIONS))
t1.start()
t2.start()
t1.join()
t2.join()
# Очікуємо 0 (поклали і зняли однакову суму)
# Але отримуємо щось інше кожного разу!
print(f"Баланс: {balance}") # -3421? 7892? Завжди різне!
Чому так? Операція balance += 1 насправді складається з трьох байткод-інструкцій:
LOAD_GLOBAL balance— прочитати поточне значенняLOAD_CONST 1— завантажити 1BINARY_ADD— скластиSTORE_GLOBAL balance— записати результат
GIL може перемкнути потік між будь-якими двома інструкціями. Якщо обидва потоки прочитали balance = 100, перший записав 101, а другий теж записав 101 (замість 100) — одне додавання втрачено назавжди.
Частина III: Примітиви синхронізації
Lock — базовий мютекс
Lock (mutual exclusion lock) гарантує, що тільки один потік може виконувати захищений блок коду в момент часу. Потік, що входить у захищений блок, захоплює (acquire) замок. Інші потоки, що намагаються захопити вже захоплений замок, блокуються і чекають, поки перший потік звільнить (release) його.
# lock_demo.py
import threading
balance = 0
lock = threading.Lock() # Один замок — один захищений ресурс
def deposit(amount: int, times: int) -> None:
global balance
for _ in range(times):
with lock: # acquire() при вході, release() при виході
balance += amount # тепер атомарна секція
def withdraw(amount: int, times: int) -> None:
global balance
for _ in range(times):
with lock:
balance -= amount
ITERATIONS = 100_000
t1 = threading.Thread(target=deposit, args=(1, ITERATIONS))
t2 = threading.Thread(target=withdraw, args=(1, ITERATIONS))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Баланс: {balance}") # Завжди 0 ✅
with lock: замість явних lock.acquire() / lock.release(). Конструкція with гарантує звільнення замку навіть при виключенні всередині блоку. Явні виклики легко забути або пропустити при рефакторингу.RLock — реєнтерабельний замок
Звичайний Lock не можна захопити двічі з одного потоку — потік заблокує сам себе (deadlock). RLock (reentrant lock) вирішує цю проблему: один потік може захопити його кілька разів, і замок звільняється лише після відповідної кількості release().
# rlock_demo.py
import threading
rlock = threading.RLock()
def outer_operation() -> None:
with rlock: # Захоплюємо RLock (count=1)
print("Зовнішня операція")
inner_operation() # Викликаємо функцію, що теж захоплює
print("Зовнішня завершена")
def inner_operation() -> None:
with rlock: # Той самий потік захоплює вдруге (count=2)
print(" Внутрішня операція")
# Звільняємо: count → 1
# Звільняємо outer_operation: count → 0 → замок реально звільнено
# З звичайним Lock тут був би deadlock!
t = threading.Thread(target=outer_operation)
t.start()
t.join()
RLock типовий у рекурсивних алгоритмах або у класах, де публічний метод викликає інший публічний метод, і обидва захищені одним замком. Якщо ваш код не рекурсивний — використовуйте звичайний Lock (він ефективніший).Semaphore — обмеження конкурентності
Semaphore — замок з лічильником. На відміну від Lock (дозволяє лише 1 потік), Semaphore(N) дозволяє N потокам одночасно виконувати захищену секцію. Ідеально для обмеження навантаження на зовнішні ресурси.
# semaphore_demo.py
import threading
import time
import urllib.request
from typing import Any
# Обмежуємо одночасні HTTP-запити до 3 (щоб не перевантажити сервер)
MAX_CONCURRENT = 3
semaphore = threading.Semaphore(MAX_CONCURRENT)
results: list[tuple[str, int]] = []
results_lock = threading.Lock()
def fetch_with_limit(url: str) -> None:
with semaphore: # Блокує, якщо вже 3 потоки виконують запит
print(f" → Запит: {url[-20:]}")
try:
with urllib.request.urlopen(url, timeout=10) as r:
data = r.read()
with results_lock:
results.append((url, len(data)))
except Exception as e:
print(f" ✗ Помилка: {e}")
print(f" ← Завершено: {url[-20:]}")
urls = [f"https://httpbin.org/delay/0.5?id={i}" for i in range(10)]
threads = [threading.Thread(target=fetch_with_limit, args=(u,)) for u in urls]
start = time.perf_counter()
for t in threads: t.start()
for t in threads: t.join()
elapsed = time.perf_counter() - start
print(f"\nОброблено {len(results)} URL за {elapsed:.2f}s")
print(f"(Без обмеження було б ~0.5s, з Semaphore(3) — ~{0.5 * (len(urls) // MAX_CONCURRENT):.1f}s)")
Event — сигнал між потоками
Event — простий механізм сигналізації. Один потік встановлює подію (set()), інші чекають на неї (wait()). Після set() усі потоки, що чекали, розблоковуються.
# event_demo.py
import threading
import time
# Сценарій: сервер ініціалізується, клієнти чекають готовності
ready_event = threading.Event()
def server_startup() -> None:
print("[Сервер] Ініціалізація...")
time.sleep(2) # довга ініціалізація
print("[Сервер] Готовий до з'єднань!")
ready_event.set() # Сигналізуємо всім клієнтам
def client(client_id: int) -> None:
print(f"[Клієнт-{client_id}] Чекаю готовності сервера...")
ready_event.wait() # Блокується до set()
print(f"[Клієнт-{client_id}] З'єднуюсь!")
# Запускаємо сервер і 5 клієнтів
server = threading.Thread(target=server_startup)
clients = [threading.Thread(target=client, args=(i,)) for i in range(1, 6)]
for c in clients: c.start()
server.start()
server.join()
for c in clients: c.join()
Condition — очікування умови
Condition — просунутий примітив для сценаріїв producer-consumer, де один потік чекає, поки інший підготує дані. Об'єднує Lock і Event в один зручний API.
# condition_demo.py
import threading
import time
from collections import deque
class BoundedBuffer:
"""Кільцевий буфер з обмеженою місткістю."""
def __init__(self, capacity: int):
self.capacity = capacity
self.buffer: deque = deque()
self.condition = threading.Condition()
def put(self, item: int) -> None:
with self.condition:
# Чекаємо, поки є місце
while len(self.buffer) >= self.capacity:
print(f" [Producer] Буфер повний ({self.capacity}), чекаю...")
self.condition.wait()
self.buffer.append(item)
print(f" [Producer] Поклав {item!r}. У буфері: {len(self.buffer)}")
self.condition.notify_all() # Сповіщаємо Consumer(s)
def get(self) -> int:
with self.condition:
# Чекаємо, поки є що брати
while not self.buffer:
print(f" [Consumer] Буфер порожній, чекаю...")
self.condition.wait()
item = self.buffer.popleft()
print(f" [Consumer] Взяв {item!r}. У буфері: {len(self.buffer)}")
self.condition.notify_all() # Сповіщаємо Producer(s)
return item
buf = BoundedBuffer(capacity=3)
def producer():
for i in range(8):
buf.put(i)
time.sleep(0.1)
def consumer():
for _ in range(8):
item = buf.get()
time.sleep(0.3) # Consumer повільніший за Producer
t_prod = threading.Thread(target=producer)
t_cons = threading.Thread(target=consumer)
t_prod.start(); t_cons.start()
t_prod.join(); t_cons.join()
Barrier — точка зустрічі потоків
Barrier(N) блокує потоки, поки не зберуться всі N. Потім усі одночасно продовжують виконання. Корисний для фазових обчислень, де всі потоки мають завершити поточний крок перед початком наступного.
# barrier_demo.py
import threading
import time
import random
def worker(worker_id: int, barrier: threading.Barrier) -> None:
# Фаза 1: підготовка (різний час у кожного)
prep_time = random.uniform(0.5, 2.0)
print(f"[Worker-{worker_id}] Підготовка ({prep_time:.1f}s)...")
time.sleep(prep_time)
print(f"[Worker-{worker_id}] Готовий, чекаю інших...")
barrier.wait() # Чекаємо всіх учасників
print(f"[Worker-{worker_id}] Всі готові! Починаємо фазу 2")
NUM_WORKERS = 4
barrier = threading.Barrier(NUM_WORKERS)
threads = [threading.Thread(target=worker, args=(i, barrier))
for i in range(NUM_WORKERS)]
for t in threads: t.start()
for t in threads: t.join()
Частина IV: Потокобезпечні черги — queue.Queue
Чому queue.Queue є кращим за список зі Lock
Модуль queue надає потокобезпечні черги, що реалізують внутрішню синхронізацію. Замість того, щоб вручну захищати список Lock-ом, можна використовувати Queue — вона вже потокобезпечна «з коробки» і додає підтримку блокуючих операцій.
# queue_producer_consumer.py
import queue
import threading
import time
from typing import Any
# Sentinel (сигнальний об'єкт) — позначає кінець роботи
STOP_SIGNAL = object()
def producer(q: queue.Queue, items: list[Any]) -> None:
"""Виробник: кладе завдання у чергу."""
for item in items:
q.put(item) # Блокується, якщо черга повна (maxsize)
print(f"[Producer] → {item}")
time.sleep(0.1)
q.put(STOP_SIGNAL) # Сигналізуємо кінець
print("[Producer] Усі завдання передані")
def consumer(q: queue.Queue, worker_id: int) -> None:
"""Споживач: бере і обробляє завдання."""
while True:
item = q.get() # Блокується, якщо черга порожня
if item is STOP_SIGNAL:
q.put(STOP_SIGNAL) # Передаємо сигнал наступному consumer
break
try:
print(f" [Consumer-{worker_id}] Обробляю {item}")
time.sleep(0.3) # Симулюємо обробку
finally:
q.task_done() # Повідомляємо queue про завершення обробки
# Черга з обмеженим розміром (backpressure)
task_queue: queue.Queue = queue.Queue(maxsize=5)
prod = threading.Thread(target=producer, args=(task_queue, list(range(10))))
consumers = [
threading.Thread(target=consumer, args=(task_queue, i))
for i in range(3)
]
prod.start()
for c in consumers: c.start()
prod.join()
for c in consumers: c.join()
# Альтернатива: q.join() чекає, поки всі task_done() будуть викликані
queue.Queue vs queue.LifoQueue vs queue.PriorityQueue
import queue
# FIFO (First In, First Out) — стандартна черга
fifo: queue.Queue[int] = queue.Queue()
fifo.put(1); fifo.put(2); fifo.put(3)
print(fifo.get(), fifo.get(), fifo.get()) # 1 2 3
# LIFO (Last In, First Out) — стек
lifo: queue.LifoQueue[int] = queue.LifoQueue()
lifo.put(1); lifo.put(2); lifo.put(3)
print(lifo.get(), lifo.get(), lifo.get()) # 3 2 1
# Пріоритетна черга — менше число = вищий пріоритет
pq: queue.PriorityQueue[tuple[int, str]] = queue.PriorityQueue()
pq.put((3, "низький пріоритет"))
pq.put((1, "критичний"))
pq.put((2, "середній"))
print(pq.get(), pq.get(), pq.get()) # (1, 'критичний') (2, 'середній') (3, 'низький...')
Частина V: ThreadPoolExecutor — пул потоків
Навіщо пул потоків
Створення потоку — це системний виклик з певними накладними витратами. Якщо потрібно виконати 10 000 коротких завдань — створювати по потоку для кожного неефективно. Пул потоків вирішує це: N потоків (workers) існують весь час і по черзі беруть завдання з черги.
ThreadPoolExecutor з модуля concurrent.futures — сучасний, зручний і типобезпечний спосіб працювати з пулом потоків.
submit() і Future
# thread_pool_submit.py
from concurrent.futures import ThreadPoolExecutor, Future
import time
import urllib.request
def fetch(url: str) -> tuple[str, int]:
"""Повертає (url, розмір відповіді)."""
with urllib.request.urlopen(url, timeout=10) as r:
return url, len(r.read())
urls = [
"https://httpbin.org/get",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
]
with ThreadPoolExecutor(max_workers=4) as pool:
# submit() негайно повертає Future — не чекає результату!
futures: list[Future[tuple[str, int]]] = [
pool.submit(fetch, url) for url in urls
]
# Результати доступні через future.result() — блокується до завершення
for future in futures:
url, size = future.result()
print(f" {url}: {size} байт")
# Після виходу з блоку with — pool.shutdown(wait=True) викликається автоматично
map() — паралельний аналог вбудованого map()
# thread_pool_map.py
from concurrent.futures import ThreadPoolExecutor
import time
def process_item(item: int) -> int:
"""CPU-проста задача для демонстрації."""
time.sleep(0.1) # Симулюємо I/O
return item ** 2
items = list(range(20))
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as pool:
# map() повертає ітератор результатів у тому ж порядку, що й вхідні дані
results = list(pool.map(process_item, items))
elapsed = time.perf_counter() - start
print(f"Результати: {results}")
print(f"Час: {elapsed:.2f}s (послідовно було б ~2.0s)")
Обробка виключень у Future
# future_exceptions.py
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
import random
def unreliable_task(task_id: int) -> str:
"""Задача, що іноді кидає виключення."""
if random.random() < 0.3: # 30% шанс помилки
raise ValueError(f"Задача {task_id} зазнала збою!")
return f"Задача {task_id} успішна"
with ThreadPoolExecutor(max_workers=5) as pool:
futures = {pool.submit(unreliable_task, i): i for i in range(10)}
# as_completed() повертає futures в порядку ЗАВЕРШЕННЯ (не запуску!)
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result() # Кидає виключення, якщо задача провалилась
print(f" ✓ {result}")
except ValueError as e:
print(f" ✗ Задача {task_id}: {e}")
as_completed() — зручний спосіб обробляти результати в міру їхньої готовності, а не чекати найповільнішого. Корисний у сценаріях де перші результати потрібні якомога швидше (наприклад, пошук у кількох джерелах).Частина VI: threading.local() — ізоляція стану між потоками
Іноді потрібно зберігати різний стан для кожного потоку — наприклад, з'єднання з базою даних або контекст поточного запиту. threading.local() створює об'єкт, у якому кожен потік бачить власну копію атрибутів.
# thread_local_demo.py
import threading
import time
# thread-local storage — кожен потік має власний контекст
_ctx = threading.local()
def set_request_context(user_id: int, request_id: str) -> None:
"""Встановлює контекст поточного запиту для цього потоку."""
_ctx.user_id = user_id
_ctx.request_id = request_id
def get_current_user() -> int | None:
"""Повертає user_id поточного потоку (або None якщо не встановлено)."""
return getattr(_ctx, "user_id", None)
def handle_request(user_id: int, request_id: str) -> None:
"""Симулює обробку HTTP-запиту у потоці."""
set_request_context(user_id, request_id)
time.sleep(0.1) # Симулюємо роботу
# Будь-яка функція у цьому потоці може отримати контекст
user = get_current_user()
print(f"[{request_id}] Обробляю запит користувача {user}")
time.sleep(0.1)
print(f"[{request_id}] Завершено для user={user}")
threads = [
threading.Thread(target=handle_request, args=(uid, f"REQ-{uid:03d}"))
for uid in range(1, 6)
]
for t in threads: t.start()
for t in threads: t.join()
Кожен потік бачить свій user_id, хоча всі звертаються до одного об'єкта _ctx. Flask використовує threading.local() (через werkzeug.local.Local) саме для цього — щоб flask.g, flask.request та flask.session були різними для різних потоків-запитів.
Частина VII: Deadlock — взаємне блокування
Що таке deadlock і як він виникає
Deadlock — ситуація, коли два або більше потоки заблоковані назавжди, бо кожен чекає ресурс, що захоплений іншим. Класичний приклад — «проблема обідаючих філософів»:
# deadlock_demo.py
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1() -> None:
print("[T1] Захоплюю lock_a...")
with lock_a:
print("[T1] lock_a захоплено. Чекаю lock_b...")
time.sleep(0.1) # дозволяємо T2 захопити lock_b
with lock_b: # T1 чекає lock_b, але він захоплений T2
print("[T1] lock_b захоплено. Готово!")
def thread_2() -> None:
print("[T2] Захоплюю lock_b...")
with lock_b:
print("[T2] lock_b захоплено. Чекаю lock_a...")
time.sleep(0.1)
with lock_a: # T2 чекає lock_a, але він захоплений T1
print("[T2] lock_a захоплено. Готово!")
# DEADLOCK: T1 тримає lock_a і чекає lock_b
# T2 тримає lock_b і чекає lock_a
# Обидва чекають вічно!
t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)
t1.start(); t2.start()
# t1.join(); t2.join() # ← ніколи не завершиться!
Стратегії уникнення deadlock
Стратегія 1: завжди захоплювати замки в одному порядку
# deadlock_fix_1.py — послідовний порядок захоплення
def thread_1_safe() -> None:
with lock_a: # ← завжди A, потім B
with lock_b:
print("[T1] Виконуємо роботу")
def thread_2_safe() -> None:
with lock_a: # ← і тут теж A, потім B (не B, потім A!)
with lock_b:
print("[T2] Виконуємо роботу")
Стратегія 2: lock.acquire(timeout=...)
# deadlock_fix_2.py — timeout при захопленні замку
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
def safe_thread() -> None:
acquired_a = lock_a.acquire(timeout=1.0)
if not acquired_a:
print("Не вдалось захопити lock_a, повторимо пізніше")
return
try:
acquired_b = lock_b.acquire(timeout=1.0)
if not acquired_b:
print("Не вдалось захопити lock_b, відпускаємо lock_a")
return
try:
print("Обидва замки захоплено, виконуємо роботу")
finally:
lock_b.release()
finally:
lock_a.release()
Стратегія 3: замінити Lock на queue.Queue — найкраща стратегія. Якщо замість спільного стану з замками використовувати передачу повідомлень через чергу, deadlock стає структурно неможливим.
Частина VIII: Реальний патерн — паралельний завантажувач
Об'єднаємо все вивчене в production-ready паралельний завантажувач файлів з обмеженням конкурентності, обробкою помилок і прогрес-баром:
# parallel_downloader.py
from __future__ import annotations
import threading
import time
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class DownloadResult:
url: str
success: bool
size_bytes: int = 0
error: str = ""
elapsed: float = 0.0
@dataclass
class DownloadStats:
total: int = 0
completed: int = 0
failed: int = 0
total_bytes: int = 0
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
def update(self, result: DownloadResult) -> None:
with self._lock:
self.completed += 1
if result.success:
self.total_bytes += result.size_bytes
else:
self.failed += 1
def download_url(
url: str,
timeout: int = 15,
on_progress: Callable[[str, int], None] | None = None,
) -> DownloadResult:
"""Завантажує URL і повертає результат."""
start = time.perf_counter()
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
data = response.read()
elapsed = time.perf_counter() - start
result = DownloadResult(
url=url, success=True,
size_bytes=len(data), elapsed=elapsed,
)
if on_progress:
on_progress(url, len(data))
return result
except Exception as e:
elapsed = time.perf_counter() - start
return DownloadResult(url=url, success=False, error=str(e), elapsed=elapsed)
def parallel_download(
urls: list[str],
max_workers: int = 10,
timeout: int = 15,
verbose: bool = True,
) -> list[DownloadResult]:
"""
Паралельне завантаження списку URL.
Args:
urls: список URL для завантаження
max_workers: максимальна кількість паралельних потоків
timeout: таймаут на один запит у секундах
verbose: виводити прогрес у консоль
"""
stats = DownloadStats(total=len(urls))
results: list[DownloadResult] = []
results_lock = threading.Lock()
def progress_callback(url: str, size: int) -> None:
if verbose:
short_url = url.split("?")[0][-40:]
print(f" ✓ {short_url} ({size:,} байт)")
start_total = time.perf_counter()
with ThreadPoolExecutor(max_workers=max_workers) as pool:
future_to_url = {
pool.submit(download_url, url, timeout, progress_callback): url
for url in urls
}
for future in as_completed(future_to_url):
result = future.result()
stats.update(result)
with results_lock:
results.append(result)
if verbose and not result.success:
print(f" ✗ ПОМИЛКА {result.url[-40:]}: {result.error}")
total_elapsed = time.perf_counter() - start_total
if verbose:
print(f"\n{'─'*50}")
print(f"Завершено: {stats.completed}/{stats.total}")
print(f"Успішно: {stats.completed - stats.failed}")
print(f"Помилок: {stats.failed}")
print(f"Отримано: {stats.total_bytes:,} байт")
print(f"Час: {total_elapsed:.2f}s")
avg_sequential = sum(r.elapsed for r in results)
print(f"Прискорення: {avg_sequential / total_elapsed:.1f}x")
return results
# ─── Демонстрація ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
test_urls = [
f"https://httpbin.org/delay/0.3?n={i}" for i in range(12)
]
print(f"Завантажуємо {len(test_urls)} URL (по ~0.3s кожен)\n")
results = parallel_download(test_urls, max_workers=6)
Підсумок: ключові принципи threading
Lock — захист спільного стану
with lock: — це гарантує звільнення навіть при виключенні.Queue — передача замість поділу
queue.Queue. Це спрощує код і унеможливлює deadlock.ThreadPoolExecutor — не ThreadPoolExecutor
ThreadPoolExecutor, а не threading.Thread вручну. Він управляє пулом, обробляє виключення через Future, надає as_completed().daemon=True для фонових задач
Таблиця примітивів синхронізації
| Примітив | Використання | Ключові методи |
|---|---|---|
Lock | Захист спільного ресурсу (один потік) | acquire(), release(), with lock: |
RLock | Рекурсивний/реєнтерабельний доступ | Ті самі, що Lock |
Semaphore(N) | Обмеження N потоків одночасно | acquire(), release() |
Event | Сигнал між потоками (one-shot або reset) | set(), clear(), wait(), is_set() |
Condition | Producer-consumer, wait for condition | wait(), notify(), notify_all() |
Barrier(N) | Синхронізація N потоків у точці | wait() |
queue.Queue | Потокобезпечний обмін даними | put(), get(), task_done(), join() |
threading.local() | Ізольований стан per-thread | Звичайне присвоєння атрибутів |
Антипатерни, яких слід уникати
| Антипатерн | Чому небезпечний | Рішення |
|---|---|---|
Спільний список без Lock | Race condition | Lock або queue.Queue |
lock.acquire() без try/finally | Витік замку при виключенні | with lock: |
| Захоплення замків у різному порядку | Deadlock | Фіксований порядок або Queue |
| Занадто багато потоків (>100) | Накладні витрати ОС | ThreadPoolExecutor(max_workers=N) |
time.sleep() всередині with lock: | Тримаємо замок надто довго | Мінімізуйте критичну секцію |
| Non-daemon потоки, про які «забули» | Програма не завершується | join() або daemon=True |
GIL та модель конкурентності CPython — фундамент перед потоками і процесами
Глибокий розбір Global Interpreter Lock у CPython, різниці між concurrency та parallelism, I/O-bound та CPU-bound задачами. Benchmarks та шпаргалка вибору між threading, multiprocessing та asyncio.
Multiprocessing — справжній паралелізм для CPU-bound задач
Вичерпний розбір модуля multiprocessing у Python — від Process і методів запуску до Pool, ProcessPoolExecutor, міжпроцесної комунікації (Queue, Pipe, shared_memory), проблеми pickling та реальних прикладів паралельної обробки даних.