Multiprocessing — справжній паралелізм для CPU-bound задач
Multiprocessing — справжній паралелізм для CPU-bound задач
Проблема: чому потоки не рятують від GIL при обчисленнях
У попередній статті ми побачили, що threading чудово прискорює I/O-bound задачі. Але що відбувається, коли потрібно обробити величезний масив даних, порахувати хеш мільйона файлів або натренувати просту модель машинного навчання?
# Спроба прискорити обчислення потоками — вже відома пастка
import threading
import time
def cpu_crunch(n: int) -> int:
"""Чисті обчислення без жодного I/O."""
return sum(i * i for i in range(n))
N = 20_000_000
# Однопотоково
t0 = time.perf_counter()
cpu_crunch(N)
cpu_crunch(N)
single = time.perf_counter() - t0
print(f"Однопотоково: {single:.2f}s")
# Два потоки — очікуємо 2x прискорення...
t0 = time.perf_counter()
t1 = threading.Thread(target=cpu_crunch, args=(N,))
t2 = threading.Thread(target=cpu_crunch, args=(N,))
t1.start(); t2.start()
t1.join(); t2.join()
threaded = time.perf_counter() - t0
print(f"Два потоки: {threaded:.2f}s ← майже те саме!")
print(f"Прискорення: {single / threaded:.2f}x (очікувалось 2x)")
GIL не дозволяє двом потокам виконувати Python-байткод одночасно. Для CPU-bound коду єдиний вихід — кілька окремих процесів, кожен зі своїм інтерпретатором і своїм GIL.
multiprocessing — правильний вибір для CPU-bound задач, що добре паралелізуються. Кожен процес має власний Python-інтерпретатор, власну пам'ять і власний GIL — тобто справжній паралелізм на кількох ядрах процесора. Але за це доводиться платити: накладні витрати на запуск процесу і серіалізацію даних між ними.Частина I: multiprocessing.Process — основи
Від потоку до процесу: одна зміна, інша природа
Поверхнево API multiprocessing.Process нагадує threading.Thread. Але під капотом — принципова різниця: кожен процес є окремою програмою з власним адресним простором.
# process_basics.py
import multiprocessing
import os
import time
def worker(name: str, delay: float) -> None:
"""Функція, що виконується в окремому процесі."""
pid = os.getpid()
print(f"[{name}] Процес PID={pid}, починаю роботу")
time.sleep(delay)
print(f"[{name}] Процес PID={pid}, завершено")
if __name__ == "__main__":
main_pid = os.getpid()
print(f"Головний процес PID={main_pid}")
p1 = multiprocessing.Process(target=worker, args=("Worker-A", 1.0))
p2 = multiprocessing.Process(target=worker, args=("Worker-B", 1.5))
p1.start()
p2.start()
print(f"Запущено дочірні процеси: {p1.pid}, {p2.pid}")
p1.join()
p2.join()
print("Обидва процеси завершились")
Зверніть на різні PID — кожен Process є окремим системним процесом, що запущений операційною системою. У threading всі потоки мали б однаковий PID.
Обов'язковий if __name__ == "__main__":
spawn) код модуля виконується заново у кожному дочірньому процесі при його старті. Без захисту if __name__ == "__main__": кожен дочірній процес запускатиме нових дочірніх — рекурсивно, до краху системи. Завжди загортайте код запуску процесів у цей guard.# ПРАВИЛЬНО
if __name__ == "__main__":
p = multiprocessing.Process(target=worker)
p.start()
p.join()
# НЕПРАВИЛЬНО — на Windows призведе до безкінечного рекурсивного запуску!
p = multiprocessing.Process(target=worker)
p.start()
p.join()
Методи запуску процесів: spawn, fork, forkserver
Python надає три способи створення дочірнього процесу, і вибір між ними впливає на швидкість, безпеку і сумісність:
fork() — клонує весь батьківський процес разом з усіма відкритими файлами, сокетами і пам'яттю. Найшвидший старт, але небезпечний при використанні бібліотек з глобальним станом (наприклад, багатопотокові C-extensions). Не підтримується на Windows.spawn і fork: чисте середовище і відносно швидкий старт.# start_method_demo.py
import multiprocessing
def show_pid(label: str) -> None:
import os
print(f"[{label}] PID: {os.getpid()}")
if __name__ == "__main__":
# Дивимося поточний метод
print(f"Поточний метод: {multiprocessing.get_start_method()}")
# Змінюємо метод (один раз, до запуску будь-яких процесів)
# multiprocessing.set_start_method("spawn") # або "fork", "forkserver"
# Або через контекст:
ctx = multiprocessing.get_context("spawn")
p = ctx.Process(target=show_pid, args=("spawn-child",))
p.start()
p.join()
Частина II: Демонстрація реального прискорення
Benchmark: один процес vs пул процесів
# multiprocessing_benchmark.py
import multiprocessing
import time
def is_prime(n: int) -> bool:
"""Перевіряє, чи є число простим — CPU-bound операція."""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(n ** 0.5) + 1, 2):
if n % i == 0:
return False
return True
def count_primes_in_range(start: int, end: int) -> int:
"""Рахує прості числа у діапазоні [start, end)."""
return sum(1 for n in range(start, end) if is_prime(n))
def split_range(total: int, chunks: int) -> list[tuple[int, int]]:
"""Ділить діапазон [0, total) на рівні частини."""
size = total // chunks
return [(i * size, (i + 1) * size) for i in range(chunks)]
if __name__ == "__main__":
LIMIT = 2_000_000
cpu_count = multiprocessing.cpu_count()
print(f"Рахуємо прості числа до {LIMIT:,}")
print(f"Доступно ядер: {cpu_count}\n")
# ── Однопотоково (baseline) ───────────────────────────────────────────────
t0 = time.perf_counter()
result_single = count_primes_in_range(0, LIMIT)
single_time = time.perf_counter() - t0
print(f"Однопроцесно: {result_single:,} простих за {single_time:.2f}s")
# ── Пул процесів ──────────────────────────────────────────────────────────
for num_workers in [2, 4, cpu_count]:
ranges = split_range(LIMIT, num_workers)
t0 = time.perf_counter()
with multiprocessing.Pool(processes=num_workers) as pool:
results = pool.starmap(count_primes_in_range, ranges)
elapsed = time.perf_counter() - t0
total = sum(results)
speedup = single_time / elapsed
print(
f"Pool({num_workers} workers): {total:,} простих за {elapsed:.2f}s "
f"→ {speedup:.2f}x прискорення"
)
Справжнє лінійне прискорення! Майже 7x на 8 ядрах. Різниця від ідеального 8x — це накладні витрати на запуск пулу і комунікацію між процесами.
Частина III: Pool — пул процесів для масових задач
Pool — найзручніший спосіб розподілити тисячі незалежних задач між процесами. Пул підтримує чотири основних методи:
Pool.map() — паралельний аналог map()
# pool_map.py
import multiprocessing
def square(x: int) -> int:
return x * x
if __name__ == "__main__":
data = list(range(10))
with multiprocessing.Pool(processes=4) as pool:
# map() — блокується, повертає результати в порядку вхідних даних
results = pool.map(square, data)
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# map() з розміром чанку (для великих даних ефективніше)
big_data = list(range(100_000))
results = pool.map(square, big_data, chunksize=1000)
print(f"Оброблено {len(results)} елементів")
Pool.starmap() — для функцій з кількома аргументами
# pool_starmap.py
import multiprocessing
def power(base: int, exp: int) -> int:
return base ** exp
if __name__ == "__main__":
# starmap розпаковує кожен елемент як аргументи функції
tasks = [(2, 10), (3, 5), (5, 4), (10, 3)]
with multiprocessing.Pool() as pool:
results = pool.starmap(power, tasks)
print(results) # [1024, 243, 625, 1000]
Pool.apply_async() — асинхронний запуск з колбеком
# pool_async.py
import multiprocessing
import time
def slow_computation(task_id: int, n: int) -> tuple[int, int]:
"""Довге обчислення — повертає (task_id, результат)."""
result = sum(i ** 2 for i in range(n))
return task_id, result
def on_success(result: tuple[int, int]) -> None:
task_id, value = result
print(f" ✓ Задача {task_id} завершена: {value:,}")
def on_error(exc: Exception) -> None:
print(f" ✗ Помилка: {exc}")
if __name__ == "__main__":
tasks = [(i, 1_000_000) for i in range(8)]
with multiprocessing.Pool(processes=4) as pool:
# apply_async повертає AsyncResult негайно (не блокується)
async_results = [
pool.apply_async(
slow_computation,
args=task,
callback=on_success,
error_callback=on_error,
)
for task in tasks
]
# Чекаємо всі результати
pool.close() # забороняємо нові задачі
pool.join() # чекаємо завершення всіх
print("Усі задачі виконані")
Pool.imap() — ліниве відображення (для великих даних)
# pool_imap.py
import multiprocessing
def process_line(line: str) -> str:
"""Обробка одного рядка файлу."""
return line.strip().upper()
if __name__ == "__main__":
# imap() повертає ітератор — результати не зберігаються всі в пам'яті
# Ідеально для обробки великих файлів построково
lines = [f"рядок {i}\n" for i in range(1_000_000)]
with multiprocessing.Pool() as pool:
# Без imap(): всі результати в пам'яті → Out of Memory для великих даних
# З imap(): обробляємо потоково, результати генеруються поступово
for result in pool.imap(process_line, lines, chunksize=1000):
pass # Обробляємо результат одразу, не зберігаємо всі
print("Готово (побудково, без навантаження на пам'ять)")
Частина IV: ProcessPoolExecutor — сучасний API через concurrent.futures
ProcessPoolExecutor з модуля concurrent.futures надає той самий уніфікований інтерфейс, що і ThreadPoolExecutor. Це рекомендований спосіб для більшості нових проектів:
# process_pool_executor.py
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import math
def heavy_math(n: int) -> float:
"""CPU-bound задача: обчислення великої суми."""
return sum(math.sqrt(i) * math.log(i + 1) for i in range(1, n + 1))
if __name__ == "__main__":
tasks = [500_000] * 8 # 8 важких задач
# ── ProcessPoolExecutor з submit() ────────────────────────────────────────
print("Запускаємо 8 CPU-bound задач...")
t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as pool:
futures = {pool.submit(heavy_math, n): i for i, n in enumerate(tasks)}
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result()
print(f" ✓ Задача {task_id}: {result:,.2f}")
except Exception as e:
print(f" ✗ Задача {task_id}: {e}")
elapsed = time.perf_counter() - t0
print(f"Усього: {elapsed:.2f}s")
# ── ProcessPoolExecutor з map() ───────────────────────────────────────────
t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as pool:
results = list(pool.map(heavy_math, tasks))
print(f"map(): {time.perf_counter() - t0:.2f}s, {len(results)} результатів")
Pool vs ProcessPoolExecutor: коли що вибирати
| Критерій | multiprocessing.Pool | ProcessPoolExecutor |
|---|---|---|
| API | map, starmap, apply_async, imap | submit, map, as_completed |
starmap підтримка | ✅ Так | ❌ Ні (треба lambda або functools.partial) |
imap (потокове) | ✅ Так | ❌ Ні |
callback для async | ✅ Так | Через Future.add_done_callback() |
| Уніфікований API з threading | ❌ | ✅ (той самий інтерфейс, що ThreadPoolExecutor) |
| Context manager | ✅ | ✅ |
| Рекомендований для нових проектів | Для специфічних потреб | ✅ За замовчуванням |
Частина V: Міжпроцесна комунікація (IPC)
Проблема: процеси не бачать пам'ять один одного
На відміну від потоків, кожен процес живе у власному адресному просторі. Зміна змінної в дочірньому процесі ніяк не впливає на батьківський:
# no_shared_memory.py
import multiprocessing
shared_list = [1, 2, 3] # У кожного процесу своя копія!
def modify_list() -> None:
shared_list.append(99)
print(f" У дочірньому: {shared_list}") # [1, 2, 3, 99]
if __name__ == "__main__":
p = multiprocessing.Process(target=modify_list)
p.start()
p.join()
print(f"У батьківському: {shared_list}") # [1, 2, 3] — незмінено!
Для обміну даними між процесами є кілька механізмів IPC.
multiprocessing.Queue — потокобезпечна черга між процесами
# mp_queue.py
import multiprocessing
import time
from typing import Any
def producer(queue: multiprocessing.Queue, items: list[Any]) -> None:
for item in items:
queue.put(item)
print(f" [Producer] → {item}")
time.sleep(0.1)
queue.put(None) # Sentinel
print(" [Producer] Завершено")
def consumer(queue: multiprocessing.Queue, results: multiprocessing.Queue) -> None:
while True:
item = queue.get()
if item is None:
break
processed = item * 2
print(f" [Consumer] {item} → {processed}")
results.put(processed)
print(" [Consumer] Завершено")
if __name__ == "__main__":
task_queue: multiprocessing.Queue = multiprocessing.Queue()
result_queue: multiprocessing.Queue = multiprocessing.Queue()
prod = multiprocessing.Process(target=producer, args=(task_queue, list(range(5))))
cons = multiprocessing.Process(target=consumer, args=(task_queue, result_queue))
prod.start(); cons.start()
prod.join(); cons.join()
# Збираємо результати
results = []
while not result_queue.empty():
results.append(result_queue.get())
print(f"Результати: {results}")
multiprocessing.Pipe — двосторонній канал
Pipe — простіший і швидший за Queue механізм для передачі даних між двома процесами:
# mp_pipe.py
import multiprocessing
import time
def worker(conn: multiprocessing.connection.Connection) -> None:
"""Отримує завдання, обробляє і відправляє відповідь."""
while True:
data = conn.recv() # Блокується, чекає дані
if data is None:
break
result = data ** 2
conn.send(result) # Відправляємо відповідь
print(f" [Worker] {data}² = {result}")
conn.close()
if __name__ == "__main__":
# Pipe() повертає два з'єднання: parent_conn і child_conn
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=worker, args=(child_conn,))
p.start()
child_conn.close() # Батько закриває свій кінець child_conn
for n in range(5):
parent_conn.send(n) # Відправляємо завдання
result = parent_conn.recv() # Отримуємо відповідь
print(f"[Main] {n}² = {result}")
parent_conn.send(None) # Sentinel — завершити worker
p.join()
parent_conn.close()
Value і Array — спільна пам'ять для простих типів
Для числових лічильників і масивів є механізм спільної пам'яті без копіювання:
# shared_value.py
import multiprocessing
import ctypes
def increment_counter(counter: multiprocessing.Value, lock: multiprocessing.Lock, n: int) -> None:
for _ in range(n):
with lock:
counter.value += 1
def fill_array(arr: multiprocessing.Array, start: int, end: int) -> None:
for i in range(start, end):
arr[i] = i * i
if __name__ == "__main__":
ITERATIONS = 100_000
# Value — один числовий тип зі спільною пам'яттю
counter = multiprocessing.Value(ctypes.c_int, 0)
lock = multiprocessing.Lock()
processes = [
multiprocessing.Process(target=increment_counter, args=(counter, lock, ITERATIONS))
for _ in range(4)
]
for p in processes: p.start()
for p in processes: p.join()
print(f"Лічильник: {counter.value}") # 400_000 ✅
# Array — масив фіксованого розміру зі спільною пам'яттю
arr = multiprocessing.Array(ctypes.c_int, 100)
p1 = multiprocessing.Process(target=fill_array, args=(arr, 0, 50))
p2 = multiprocessing.Process(target=fill_array, args=(arr, 50, 100))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"arr[0..5]: {list(arr[:6])}") # [0, 1, 4, 9, 16, 25]
print(f"arr[95..]: {list(arr[95:])}") # [9025, 9216, 9409, 9604, 9801]
shared_memory — швидка спільна пам'ять для масивів (Python 3.8+)
Для роботи з великими масивами даних (наприклад, NumPy arrays) shared_memory дозволяє уникнути будь-якого копіювання між процесами:
# shared_memory_demo.py
from multiprocessing import shared_memory, Process
import numpy as np
def worker_process(shm_name: str, shape: tuple, dtype: str, start: int, end: int) -> None:
"""Дочірній процес отримує доступ до спільної пам'яті без копіювання."""
# Підключаємось до вже створеного блоку пам'яті
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Модифікуємо частину масиву — зміни видимі у батьківському процесі!
arr[start:end] = arr[start:end] * 2
print(f" [Worker] Оброблено рядки {start}:{end}")
existing_shm.close() # Від'єднуємось (не знищуємо!)
if __name__ == "__main__":
# Створюємо великий масив у батьківському процесі
data = np.arange(1000, dtype=np.float64).reshape(100, 10)
print(f"До обробки: data[0] = {data[0]}")
print(f"До обробки: data[50] = {data[50]}")
# Створюємо блок спільної пам'яті
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
# Прив'язуємо NumPy-масив до спільної пам'яті
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:] # Копіюємо дані один раз
# Запускаємо процеси, що будуть читати/писати спільну пам'ять
processes = [
Process(
target=worker_process,
args=(shm.name, data.shape, str(data.dtype), i * 25, (i + 1) * 25)
)
for i in range(4)
]
for p in processes: p.start()
for p in processes: p.join()
# Результати вже у shared_array (у тій самій пам'яті)
print(f"\nПісля обробки: data[0] = {shared_array[0]}")
print(f"Після обробки: data[50] = {shared_array[50]}")
# ОБОВ'ЯЗКОВО: звільняємо і знищуємо блок пам'яті
shm.close()
shm.unlink() # Видаляємо з системи (інакше витік ресурсів!)
shared_memory особливо корисний при роботі з NumPy у машинному навчанні. Замість того, щоб pickle-серіалізувати гігабайтний масив між процесами, можна один раз розмістити його у спільній пам'яті і надати доступ усім воркерам. Це зменшує накладні витрати IPC з хвилин до мілісекунд.Частина VI: Проблема pickling — що можна передавати між процесами
Як процеси обмінюються даними
Коли ви передаєте аргументи у Pool.map() або через Queue — дані серіалізуються (pickle) у байтовий потік, передаються через IPC-механізм операційної системи і десеріалізуються у дочірньому процесі. Це дорога операція, і не все можна серіалізувати.
# pickle_test.py
import pickle
import multiprocessing
import threading
# ✅ Можна pickle-серіалізувати:
can_pickle = [
42, # int
"рядок", # str
[1, 2, 3], # list
{"key": "value"}, # dict
(1, 2), # tuple
lambda x: x, # lambda (тільки у pickle protocol 5+, обережно!)
]
# ❌ НЕ можна pickle-серіалізувати:
class NotPicklable:
def __init__(self):
self.lock = threading.Lock() # Lock не серіалізується!
self.file = open("/dev/null", "w") # Відкрита файлова дескриптор
def check_pickling() -> None:
for obj in can_pickle:
try:
pickle.dumps(obj)
print(f" ✅ {type(obj).__name__}: OK")
except Exception as e:
print(f" ❌ {type(obj).__name__}: {e}")
# Типові об'єкти, що НЕ pickle:
not_picklable = [
threading.Lock(), # Lock
lambda x: x * 2, # lambda (в деяких контекстах)
]
for obj in not_picklable:
try:
pickle.dumps(obj)
print(f" ✅ {type(obj).__name__}: OK (несподівано)")
except Exception as e:
print(f" ❌ {type(obj).__name__}: {e}")
if __name__ == "__main__":
check_pickling()
Типова помилка: lambda у Pool.map()
# pickle_lambda_problem.py
import multiprocessing
if __name__ == "__main__":
data = [1, 2, 3, 4, 5]
# ❌ Не працює — lambda не можна pickle в контексті Pool
try:
with multiprocessing.Pool() as pool:
results = pool.map(lambda x: x ** 2, data)
except AttributeError as e:
print(f"Помилка: {e}")
# ✅ Рішення 1: іменована функція
def square(x): return x ** 2
with multiprocessing.Pool() as pool:
results = pool.map(square, data)
print(f"Іменована функція: {results}")
# ✅ Рішення 2: functools.partial для функцій з фіксованим аргументом
from functools import partial
def power(base, exp): return base ** exp
square_fn = partial(power, exp=2)
with multiprocessing.Pool() as pool:
results = pool.map(square_fn, data)
print(f"functools.partial: {results}")
Частина VII: Реальний патерн — паралельна обробка зображень
Об'єднаємо всі концепції у production-ready обробник: паралельно застосовуємо фільтри до колекції зображень з прогрес-трекінгом та обробкою помилок.
# parallel_image_processor.py
from __future__ import annotations
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
import math
# Симулюємо важку обробку зображення (у реальності — PIL/Pillow операції)
@dataclass
class ImageTask:
image_id: int
width: int
height: int
filter_name: str
@dataclass
class ProcessingResult:
image_id: int
success: bool
elapsed: float
output_path: str = ""
error: str = ""
def apply_gaussian_blur(task: ImageTask) -> ProcessingResult:
"""
Симулює CPU-важку обробку зображення (Gaussian blur).
У реальному коді тут були б PIL/cv2 операції.
"""
start = time.perf_counter()
try:
# Симулюємо обчислювальну складність, пропорційну розміру зображення
pixels = task.width * task.height
# Гаусів фільтр: для кожного пікселя — обчислення з ядром 5x5
kernel_size = 5
_ = sum(
math.exp(-(dx**2 + dy**2) / (2 * 1.0**2))
for _ in range(pixels // 100) # Зменшено для демо
for dx in range(-kernel_size, kernel_size + 1)
for dy in range(-kernel_size, kernel_size + 1)
)
elapsed = time.perf_counter() - start
return ProcessingResult(
image_id=task.image_id,
success=True,
elapsed=elapsed,
output_path=f"/output/img_{task.image_id:04d}_{task.filter_name}.jpg",
)
except Exception as e:
return ProcessingResult(
image_id=task.image_id,
success=False,
elapsed=time.perf_counter() - start,
error=str(e),
)
def process_batch(
tasks: list[ImageTask],
max_workers: int | None = None,
verbose: bool = True,
) -> list[ProcessingResult]:
"""
Паралельно обробляє батч зображень.
Args:
tasks: список завдань обробки
max_workers: кількість процесів (None = кількість ядер CPU)
verbose: виводити прогрес
"""
if max_workers is None:
max_workers = multiprocessing.cpu_count()
results: list[ProcessingResult] = []
completed = 0
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=max_workers) as pool:
future_to_task = {pool.submit(apply_gaussian_blur, task): task for task in tasks}
for future in as_completed(future_to_task):
result = future.result()
results.append(result)
completed += 1
if verbose:
status = "✓" if result.success else "✗"
print(
f" [{status}] Image #{result.image_id:04d} "
f"({result.elapsed:.3f}s) "
f"[{completed}/{len(tasks)}]"
)
total_elapsed = time.perf_counter() - start
total_cpu_time = sum(r.elapsed for r in results)
speedup = total_cpu_time / total_elapsed if total_elapsed > 0 else 0
if verbose:
successes = sum(1 for r in results if r.success)
print(f"\n{'─' * 55}")
print(f"Оброблено: {successes}/{len(tasks)} зображень")
print(f"CPU-час: {total_cpu_time:.2f}s (сума по всіх процесах)")
print(f"Реальний час: {total_elapsed:.2f}s")
print(f"Прискорення: {speedup:.1f}x (на {max_workers} ядрах)")
return sorted(results, key=lambda r: r.image_id)
if __name__ == "__main__":
# Генеруємо батч з 16 завдань (різні розміри зображень)
import random
random.seed(42)
tasks = [
ImageTask(
image_id=i,
width=random.choice([800, 1200, 1920]),
height=random.choice([600, 800, 1080]),
filter_name="gaussian_blur",
)
for i in range(16)
]
print(f"Паралельна обробка {len(tasks)} зображень")
print(f"Доступно ядер: {multiprocessing.cpu_count()}\n")
results = process_batch(tasks, max_workers=4)
print(f"\nПерші 3 результати:")
for r in results[:3]:
print(f" Image #{r.image_id}: {r.output_path}")
Підсумок: ключові принципи multiprocessing
if __name__ == '__main__': — завжди
Pool / ProcessPoolExecutor — не Process
Process. Пул управляє воркерами, обробляє виключення і масштабується автоматично.Pickle — єдиний міст між процесами
shared_memory для великих масивів
shared_memory — уникнете pickle-overhead при передачі гігабайтів даних між процесами.Таблиця механізмів IPC
| Механізм | Між процесами | Швидкість | Типи даних | Напрямок |
|---|---|---|---|---|
Queue | ✅ | Помірна (pickle) | Будь-які pickle | Один або кілька |
Pipe | 2 процеси | Швидша (pickle) | Будь-які pickle | Двосторонній |
Value | ✅ | Дуже швидка | ctypes типи | Спільна пам'ять |
Array | ✅ | Дуже швидка | ctypes масиви | Спільна пам'ять |
shared_memory | ✅ | Максимальна | bytes (numpy) | Спільна пам'ять |
Manager | ✅ | Повільна (proxy) | list, dict, ... | Спільна пам'ять |
Антипатерни, яких слід уникати
| Антипатерн | Чому небезпечний | Рішення |
|---|---|---|
Відсутність if __name__ == "__main__": | Рекурсивний запуск на Windows | Завжди додавати |
| Передача lambda у Pool | PicklingError | Іменована функція або partial |
| Надто великі аргументи через Queue | Повільна серіалізація | shared_memory або файл |
Забути shm.unlink() | Витік ресурсів системи | Завжди cleanup у finally |
| Pool для I/O-bound задач | Накладні витрати процесів даремні | ThreadPoolExecutor для I/O |
| Занадто дрібні задачі | Overhead запуску > час задачі | Batch задачі або chunksize |
Ігнорувати p.exitcode | Тихі збої процесів | Перевіряйте exitcode != 0 |
Threading — конкурентність для I/O-bound задач
Вичерпний розбір модуля threading у Python — від Thread і daemon-потоків до Race Condition, Lock, RLock, Semaphore, Event, Barrier, потокобезпечних черг та ThreadPoolExecutor. Реальні патерни та антипатерни з прикладами.
asyncio — кооперативна конкурентність та event loop
Вичерпний розбір asyncio у Python — від моделі event loop і корутин до Task, Future, примітивів синхронізації, таймаутів та інтеграції з синхронним кодом. Практичні патерни для побудови масштабованих асинхронних програм.