Python

Multiprocessing — справжній паралелізм для CPU-bound задач

Вичерпний розбір модуля multiprocessing у Python — від Process і методів запуску до Pool, ProcessPoolExecutor, міжпроцесної комунікації (Queue, Pipe, shared_memory), проблеми pickling та реальних прикладів паралельної обробки даних.

Multiprocessing — справжній паралелізм для CPU-bound задач

Проблема: чому потоки не рятують від GIL при обчисленнях

У попередній статті ми побачили, що threading чудово прискорює I/O-bound задачі. Але що відбувається, коли потрібно обробити величезний масив даних, порахувати хеш мільйона файлів або натренувати просту модель машинного навчання?

# Спроба прискорити обчислення потоками — вже відома пастка
import threading
import time


def cpu_crunch(n: int) -> int:
    """Чисті обчислення без жодного I/O."""
    return sum(i * i for i in range(n))


N = 20_000_000

# Однопотоково
t0 = time.perf_counter()
cpu_crunch(N)
cpu_crunch(N)
single = time.perf_counter() - t0
print(f"Однопотоково: {single:.2f}s")

# Два потоки — очікуємо 2x прискорення...
t0 = time.perf_counter()
t1 = threading.Thread(target=cpu_crunch, args=(N,))
t2 = threading.Thread(target=cpu_crunch, args=(N,))
t1.start(); t2.start()
t1.join(); t2.join()
threaded = time.perf_counter() - t0
print(f"Два потоки:   {threaded:.2f}s  ← майже те саме!")
print(f"Прискорення:  {single / threaded:.2f}x  (очікувалось 2x)")
python threading_cpu_fail.py
$ python threading_cpu_fail.py
Однопотоково: 3.84s
Два потоки: 4.02s ← майже те саме!
Прискорення: 0.96x (очікувалось 2x)

GIL не дозволяє двом потокам виконувати Python-байткод одночасно. Для CPU-bound коду єдиний вихід — кілька окремих процесів, кожен зі своїм інтерпретатором і своїм GIL.

multiprocessing — правильний вибір для CPU-bound задач, що добре паралелізуються. Кожен процес має власний Python-інтерпретатор, власну пам'ять і власний GIL — тобто справжній паралелізм на кількох ядрах процесора. Але за це доводиться платити: накладні витрати на запуск процесу і серіалізацію даних між ними.

Частина I: multiprocessing.Process — основи

Від потоку до процесу: одна зміна, інша природа

Поверхнево API multiprocessing.Process нагадує threading.Thread. Але під капотом — принципова різниця: кожен процес є окремою програмою з власним адресним простором.

# process_basics.py
import multiprocessing
import os
import time


def worker(name: str, delay: float) -> None:
    """Функція, що виконується в окремому процесі."""
    pid = os.getpid()
    print(f"[{name}] Процес PID={pid}, починаю роботу")
    time.sleep(delay)
    print(f"[{name}] Процес PID={pid}, завершено")


if __name__ == "__main__":
    main_pid = os.getpid()
    print(f"Головний процес PID={main_pid}")

    p1 = multiprocessing.Process(target=worker, args=("Worker-A", 1.0))
    p2 = multiprocessing.Process(target=worker, args=("Worker-B", 1.5))

    p1.start()
    p2.start()

    print(f"Запущено дочірні процеси: {p1.pid}, {p2.pid}")

    p1.join()
    p2.join()
    print("Обидва процеси завершились")
python process_basics.py
$ python process_basics.py
Головний процес PID=12345
Запущено дочірні процеси: 12346, 12347
[Worker-A] Процес PID=12346, починаю роботу
[Worker-B] Процес PID=12347, починаю роботу
[Worker-A] Процес PID=12346, завершено
[Worker-B] Процес PID=12347, завершено
Обидва процеси завершились

Зверніть на різні PID — кожен Process є окремим системним процесом, що запущений операційною системою. У threading всі потоки мали б однаковий PID.

Обов'язковий if __name__ == "__main__":

На Windows і macOS (метод запуску spawn) код модуля виконується заново у кожному дочірньому процесі при його старті. Без захисту if __name__ == "__main__": кожен дочірній процес запускатиме нових дочірніх — рекурсивно, до краху системи. Завжди загортайте код запуску процесів у цей guard.
# ПРАВИЛЬНО
if __name__ == "__main__":
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

# НЕПРАВИЛЬНО — на Windows призведе до безкінечного рекурсивного запуску!
p = multiprocessing.Process(target=worker)
p.start()
p.join()

Методи запуску процесів: spawn, fork, forkserver

Python надає три способи створення дочірнього процесу, і вибір між ними впливає на швидкість, безпеку і сумісність:

spawn
Windows default, macOS default (Python 3.8+)
Запускає чистий Python-інтерпретатор і імпортує модуль заново. Найповільніший старт (~100-500мс), але найбезпечніший — дочірній процес не успадковує нічого зайвого від батьківського. Єдиний метод, що працює на Windows.
fork
Linux default
Використовує системний виклик fork()клонує весь батьківський процес разом з усіма відкритими файлами, сокетами і пам'яттю. Найшвидший старт, але небезпечний при використанні бібліотек з глобальним станом (наприклад, багатопотокові C-extensions). Не підтримується на Windows.
forkserver
Linux, macOS
Запускає окремий сервер-процес, через який замовляються нові процеси. Компроміс між spawn і fork: чисте середовище і відносно швидкий старт.
# start_method_demo.py
import multiprocessing


def show_pid(label: str) -> None:
    import os
    print(f"[{label}] PID: {os.getpid()}")


if __name__ == "__main__":
    # Дивимося поточний метод
    print(f"Поточний метод: {multiprocessing.get_start_method()}")

    # Змінюємо метод (один раз, до запуску будь-яких процесів)
    # multiprocessing.set_start_method("spawn")  # або "fork", "forkserver"

    # Або через контекст:
    ctx = multiprocessing.get_context("spawn")
    p = ctx.Process(target=show_pid, args=("spawn-child",))
    p.start()
    p.join()

Частина II: Демонстрація реального прискорення

Benchmark: один процес vs пул процесів

# multiprocessing_benchmark.py
import multiprocessing
import time


def is_prime(n: int) -> bool:
    """Перевіряє, чи є число простим — CPU-bound операція."""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(n ** 0.5) + 1, 2):
        if n % i == 0:
            return False
    return True


def count_primes_in_range(start: int, end: int) -> int:
    """Рахує прості числа у діапазоні [start, end)."""
    return sum(1 for n in range(start, end) if is_prime(n))


def split_range(total: int, chunks: int) -> list[tuple[int, int]]:
    """Ділить діапазон [0, total) на рівні частини."""
    size = total // chunks
    return [(i * size, (i + 1) * size) for i in range(chunks)]


if __name__ == "__main__":
    LIMIT = 2_000_000
    cpu_count = multiprocessing.cpu_count()
    print(f"Рахуємо прості числа до {LIMIT:,}")
    print(f"Доступно ядер: {cpu_count}\n")

    # ── Однопотоково (baseline) ───────────────────────────────────────────────
    t0 = time.perf_counter()
    result_single = count_primes_in_range(0, LIMIT)
    single_time = time.perf_counter() - t0
    print(f"Однопроцесно:    {result_single:,} простих за {single_time:.2f}s")

    # ── Пул процесів ──────────────────────────────────────────────────────────
    for num_workers in [2, 4, cpu_count]:
        ranges = split_range(LIMIT, num_workers)
        t0 = time.perf_counter()
        with multiprocessing.Pool(processes=num_workers) as pool:
            results = pool.starmap(count_primes_in_range, ranges)
        elapsed = time.perf_counter() - t0
        total = sum(results)
        speedup = single_time / elapsed
        print(
            f"Pool({num_workers} workers): {total:,} простих за {elapsed:.2f}s  "
            f"→ {speedup:.2f}x прискорення"
        )
python multiprocessing_benchmark.py
$ python multiprocessing_benchmark.py
Рахуємо прості числа до 2,000,000
Доступно ядер: 8
Однопроцесно: 148,933 простих за 1.84s
Pool(2 workers): 148,933 простих за 0.98s1.88x прискорення
Pool(4 workers): 148,933 простих за 0.51s3.61x прискорення
Pool(8 workers): 148,933 простих за 0.28s6.57x прискорення

Справжнє лінійне прискорення! Майже 7x на 8 ядрах. Різниця від ідеального 8x — це накладні витрати на запуск пулу і комунікацію між процесами.


Частина III: Pool — пул процесів для масових задач

Pool — найзручніший спосіб розподілити тисячі незалежних задач між процесами. Пул підтримує чотири основних методи:

Pool.map() — паралельний аналог map()

# pool_map.py
import multiprocessing


def square(x: int) -> int:
    return x * x


if __name__ == "__main__":
    data = list(range(10))

    with multiprocessing.Pool(processes=4) as pool:
        # map() — блокується, повертає результати в порядку вхідних даних
        results = pool.map(square, data)
        print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

        # map() з розміром чанку (для великих даних ефективніше)
        big_data = list(range(100_000))
        results = pool.map(square, big_data, chunksize=1000)
        print(f"Оброблено {len(results)} елементів")

Pool.starmap() — для функцій з кількома аргументами

# pool_starmap.py
import multiprocessing


def power(base: int, exp: int) -> int:
    return base ** exp


if __name__ == "__main__":
    # starmap розпаковує кожен елемент як аргументи функції
    tasks = [(2, 10), (3, 5), (5, 4), (10, 3)]

    with multiprocessing.Pool() as pool:
        results = pool.starmap(power, tasks)
        print(results)  # [1024, 243, 625, 1000]

Pool.apply_async() — асинхронний запуск з колбеком

# pool_async.py
import multiprocessing
import time


def slow_computation(task_id: int, n: int) -> tuple[int, int]:
    """Довге обчислення — повертає (task_id, результат)."""
    result = sum(i ** 2 for i in range(n))
    return task_id, result


def on_success(result: tuple[int, int]) -> None:
    task_id, value = result
    print(f"  ✓ Задача {task_id} завершена: {value:,}")


def on_error(exc: Exception) -> None:
    print(f"  ✗ Помилка: {exc}")


if __name__ == "__main__":
    tasks = [(i, 1_000_000) for i in range(8)]

    with multiprocessing.Pool(processes=4) as pool:
        # apply_async повертає AsyncResult негайно (не блокується)
        async_results = [
            pool.apply_async(
                slow_computation,
                args=task,
                callback=on_success,
                error_callback=on_error,
            )
            for task in tasks
        ]

        # Чекаємо всі результати
        pool.close()   # забороняємо нові задачі
        pool.join()    # чекаємо завершення всіх
        print("Усі задачі виконані")

Pool.imap() — ліниве відображення (для великих даних)

# pool_imap.py
import multiprocessing


def process_line(line: str) -> str:
    """Обробка одного рядка файлу."""
    return line.strip().upper()


if __name__ == "__main__":
    # imap() повертає ітератор — результати не зберігаються всі в пам'яті
    # Ідеально для обробки великих файлів построково
    lines = [f"рядок {i}\n" for i in range(1_000_000)]

    with multiprocessing.Pool() as pool:
        # Без imap(): всі результати в пам'яті → Out of Memory для великих даних
        # З imap(): обробляємо потоково, результати генеруються поступово
        for result in pool.imap(process_line, lines, chunksize=1000):
            pass  # Обробляємо результат одразу, не зберігаємо всі
    print("Готово (побудково, без навантаження на пам'ять)")

Частина IV: ProcessPoolExecutor — сучасний API через concurrent.futures

ProcessPoolExecutor з модуля concurrent.futures надає той самий уніфікований інтерфейс, що і ThreadPoolExecutor. Це рекомендований спосіб для більшості нових проектів:

# process_pool_executor.py
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import math


def heavy_math(n: int) -> float:
    """CPU-bound задача: обчислення великої суми."""
    return sum(math.sqrt(i) * math.log(i + 1) for i in range(1, n + 1))


if __name__ == "__main__":
    tasks = [500_000] * 8   # 8 важких задач

    # ── ProcessPoolExecutor з submit() ────────────────────────────────────────
    print("Запускаємо 8 CPU-bound задач...")
    t0 = time.perf_counter()

    with ProcessPoolExecutor(max_workers=4) as pool:
        futures = {pool.submit(heavy_math, n): i for i, n in enumerate(tasks)}

        for future in as_completed(futures):
            task_id = futures[future]
            try:
                result = future.result()
                print(f"  ✓ Задача {task_id}: {result:,.2f}")
            except Exception as e:
                print(f"  ✗ Задача {task_id}: {e}")

    elapsed = time.perf_counter() - t0
    print(f"Усього: {elapsed:.2f}s")


    # ── ProcessPoolExecutor з map() ───────────────────────────────────────────
    t0 = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as pool:
        results = list(pool.map(heavy_math, tasks))
    print(f"map(): {time.perf_counter() - t0:.2f}s, {len(results)} результатів")

Pool vs ProcessPoolExecutor: коли що вибирати

Критерійmultiprocessing.PoolProcessPoolExecutor
APImap, starmap, apply_async, imapsubmit, map, as_completed
starmap підтримка✅ Так❌ Ні (треба lambda або functools.partial)
imap (потокове)✅ Так❌ Ні
callback для async✅ ТакЧерез Future.add_done_callback()
Уніфікований API з threading✅ (той самий інтерфейс, що ThreadPoolExecutor)
Context manager
Рекомендований для нових проектівДля специфічних потреб✅ За замовчуванням

Частина V: Міжпроцесна комунікація (IPC)

Проблема: процеси не бачать пам'ять один одного

На відміну від потоків, кожен процес живе у власному адресному просторі. Зміна змінної в дочірньому процесі ніяк не впливає на батьківський:

# no_shared_memory.py
import multiprocessing

shared_list = [1, 2, 3]  # У кожного процесу своя копія!


def modify_list() -> None:
    shared_list.append(99)
    print(f"  У дочірньому: {shared_list}")  # [1, 2, 3, 99]


if __name__ == "__main__":
    p = multiprocessing.Process(target=modify_list)
    p.start()
    p.join()
    print(f"У батьківському: {shared_list}")  # [1, 2, 3] — незмінено!

Для обміну даними між процесами є кілька механізмів IPC.

multiprocessing.Queue — потокобезпечна черга між процесами

# mp_queue.py
import multiprocessing
import time
from typing import Any


def producer(queue: multiprocessing.Queue, items: list[Any]) -> None:
    for item in items:
        queue.put(item)
        print(f"  [Producer] → {item}")
        time.sleep(0.1)
    queue.put(None)  # Sentinel
    print("  [Producer] Завершено")


def consumer(queue: multiprocessing.Queue, results: multiprocessing.Queue) -> None:
    while True:
        item = queue.get()
        if item is None:
            break
        processed = item * 2
        print(f"  [Consumer] {item}{processed}")
        results.put(processed)
    print("  [Consumer] Завершено")


if __name__ == "__main__":
    task_queue: multiprocessing.Queue = multiprocessing.Queue()
    result_queue: multiprocessing.Queue = multiprocessing.Queue()

    prod = multiprocessing.Process(target=producer, args=(task_queue, list(range(5))))
    cons = multiprocessing.Process(target=consumer, args=(task_queue, result_queue))

    prod.start(); cons.start()
    prod.join();  cons.join()

    # Збираємо результати
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())
    print(f"Результати: {results}")

multiprocessing.Pipe — двосторонній канал

Pipe — простіший і швидший за Queue механізм для передачі даних між двома процесами:

# mp_pipe.py
import multiprocessing
import time


def worker(conn: multiprocessing.connection.Connection) -> None:
    """Отримує завдання, обробляє і відправляє відповідь."""
    while True:
        data = conn.recv()     # Блокується, чекає дані
        if data is None:
            break
        result = data ** 2
        conn.send(result)      # Відправляємо відповідь
        print(f"  [Worker] {data}² = {result}")
    conn.close()


if __name__ == "__main__":
    # Pipe() повертає два з'єднання: parent_conn і child_conn
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=worker, args=(child_conn,))
    p.start()
    child_conn.close()  # Батько закриває свій кінець child_conn

    for n in range(5):
        parent_conn.send(n)         # Відправляємо завдання
        result = parent_conn.recv() # Отримуємо відповідь
        print(f"[Main] {n}² = {result}")

    parent_conn.send(None)  # Sentinel — завершити worker
    p.join()
    parent_conn.close()

Value і Array — спільна пам'ять для простих типів

Для числових лічильників і масивів є механізм спільної пам'яті без копіювання:

# shared_value.py
import multiprocessing
import ctypes


def increment_counter(counter: multiprocessing.Value, lock: multiprocessing.Lock, n: int) -> None:
    for _ in range(n):
        with lock:
            counter.value += 1


def fill_array(arr: multiprocessing.Array, start: int, end: int) -> None:
    for i in range(start, end):
        arr[i] = i * i


if __name__ == "__main__":
    ITERATIONS = 100_000

    # Value — один числовий тип зі спільною пам'яттю
    counter = multiprocessing.Value(ctypes.c_int, 0)
    lock = multiprocessing.Lock()

    processes = [
        multiprocessing.Process(target=increment_counter, args=(counter, lock, ITERATIONS))
        for _ in range(4)
    ]
    for p in processes: p.start()
    for p in processes: p.join()
    print(f"Лічильник: {counter.value}")  # 400_000 ✅

    # Array — масив фіксованого розміру зі спільною пам'яттю
    arr = multiprocessing.Array(ctypes.c_int, 100)
    p1 = multiprocessing.Process(target=fill_array, args=(arr, 0, 50))
    p2 = multiprocessing.Process(target=fill_array, args=(arr, 50, 100))
    p1.start(); p2.start()
    p1.join(); p2.join()
    print(f"arr[0..5]: {list(arr[:6])}")    # [0, 1, 4, 9, 16, 25]
    print(f"arr[95..]: {list(arr[95:])}")   # [9025, 9216, 9409, 9604, 9801]

shared_memory — швидка спільна пам'ять для масивів (Python 3.8+)

Для роботи з великими масивами даних (наприклад, NumPy arrays) shared_memory дозволяє уникнути будь-якого копіювання між процесами:

# shared_memory_demo.py
from multiprocessing import shared_memory, Process
import numpy as np


def worker_process(shm_name: str, shape: tuple, dtype: str, start: int, end: int) -> None:
    """Дочірній процес отримує доступ до спільної пам'яті без копіювання."""
    # Підключаємось до вже створеного блоку пам'яті
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

    # Модифікуємо частину масиву — зміни видимі у батьківському процесі!
    arr[start:end] = arr[start:end] * 2
    print(f"  [Worker] Оброблено рядки {start}:{end}")

    existing_shm.close()  # Від'єднуємось (не знищуємо!)


if __name__ == "__main__":
    # Створюємо великий масив у батьківському процесі
    data = np.arange(1000, dtype=np.float64).reshape(100, 10)
    print(f"До обробки: data[0] = {data[0]}")
    print(f"До обробки: data[50] = {data[50]}")

    # Створюємо блок спільної пам'яті
    shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
    # Прив'язуємо NumPy-масив до спільної пам'яті
    shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
    shared_array[:] = data[:]  # Копіюємо дані один раз

    # Запускаємо процеси, що будуть читати/писати спільну пам'ять
    processes = [
        Process(
            target=worker_process,
            args=(shm.name, data.shape, str(data.dtype), i * 25, (i + 1) * 25)
        )
        for i in range(4)
    ]
    for p in processes: p.start()
    for p in processes: p.join()

    # Результати вже у shared_array (у тій самій пам'яті)
    print(f"\nПісля обробки: data[0] = {shared_array[0]}")
    print(f"Після обробки: data[50] = {shared_array[50]}")

    # ОБОВ'ЯЗКОВО: звільняємо і знищуємо блок пам'яті
    shm.close()
    shm.unlink()  # Видаляємо з системи (інакше витік ресурсів!)
shared_memory особливо корисний при роботі з NumPy у машинному навчанні. Замість того, щоб pickle-серіалізувати гігабайтний масив між процесами, можна один раз розмістити його у спільній пам'яті і надати доступ усім воркерам. Це зменшує накладні витрати IPC з хвилин до мілісекунд.

Частина VI: Проблема pickling — що можна передавати між процесами

Як процеси обмінюються даними

Коли ви передаєте аргументи у Pool.map() або через Queue — дані серіалізуються (pickle) у байтовий потік, передаються через IPC-механізм операційної системи і десеріалізуються у дочірньому процесі. Це дорога операція, і не все можна серіалізувати.

# pickle_test.py
import pickle
import multiprocessing
import threading


# ✅ Можна pickle-серіалізувати:
can_pickle = [
    42,                        # int
    "рядок",                   # str
    [1, 2, 3],                 # list
    {"key": "value"},          # dict
    (1, 2),                    # tuple
    lambda x: x,               # lambda (тільки у pickle protocol 5+, обережно!)
]

# ❌ НЕ можна pickle-серіалізувати:
class NotPicklable:
    def __init__(self):
        self.lock = threading.Lock()  # Lock не серіалізується!
        self.file = open("/dev/null", "w")  # Відкрита файлова дескриптор


def check_pickling() -> None:
    for obj in can_pickle:
        try:
            pickle.dumps(obj)
            print(f"  ✅ {type(obj).__name__}: OK")
        except Exception as e:
            print(f"  ❌ {type(obj).__name__}: {e}")

    # Типові об'єкти, що НЕ pickle:
    not_picklable = [
        threading.Lock(),           # Lock
        lambda x: x * 2,           # lambda (в деяких контекстах)
    ]
    for obj in not_picklable:
        try:
            pickle.dumps(obj)
            print(f"  ✅ {type(obj).__name__}: OK (несподівано)")
        except Exception as e:
            print(f"  ❌ {type(obj).__name__}: {e}")


if __name__ == "__main__":
    check_pickling()

Типова помилка: lambda у Pool.map()

# pickle_lambda_problem.py
import multiprocessing


if __name__ == "__main__":
    data = [1, 2, 3, 4, 5]

    # ❌ Не працює — lambda не можна pickle в контексті Pool
    try:
        with multiprocessing.Pool() as pool:
            results = pool.map(lambda x: x ** 2, data)
    except AttributeError as e:
        print(f"Помилка: {e}")

    # ✅ Рішення 1: іменована функція
    def square(x): return x ** 2
    with multiprocessing.Pool() as pool:
        results = pool.map(square, data)
    print(f"Іменована функція: {results}")

    # ✅ Рішення 2: functools.partial для функцій з фіксованим аргументом
    from functools import partial

    def power(base, exp): return base ** exp
    square_fn = partial(power, exp=2)

    with multiprocessing.Pool() as pool:
        results = pool.map(square_fn, data)
    print(f"functools.partial: {results}")

Частина VII: Реальний патерн — паралельна обробка зображень

Об'єднаємо всі концепції у production-ready обробник: паралельно застосовуємо фільтри до колекції зображень з прогрес-трекінгом та обробкою помилок.

# parallel_image_processor.py
from __future__ import annotations

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
import math


# Симулюємо важку обробку зображення (у реальності — PIL/Pillow операції)
@dataclass
class ImageTask:
    image_id: int
    width: int
    height: int
    filter_name: str


@dataclass
class ProcessingResult:
    image_id: int
    success: bool
    elapsed: float
    output_path: str = ""
    error: str = ""


def apply_gaussian_blur(task: ImageTask) -> ProcessingResult:
    """
    Симулює CPU-важку обробку зображення (Gaussian blur).
    У реальному коді тут були б PIL/cv2 операції.
    """
    start = time.perf_counter()
    try:
        # Симулюємо обчислювальну складність, пропорційну розміру зображення
        pixels = task.width * task.height
        # Гаусів фільтр: для кожного пікселя — обчислення з ядром 5x5
        kernel_size = 5
        _ = sum(
            math.exp(-(dx**2 + dy**2) / (2 * 1.0**2))
            for _ in range(pixels // 100)   # Зменшено для демо
            for dx in range(-kernel_size, kernel_size + 1)
            for dy in range(-kernel_size, kernel_size + 1)
        )
        elapsed = time.perf_counter() - start
        return ProcessingResult(
            image_id=task.image_id,
            success=True,
            elapsed=elapsed,
            output_path=f"/output/img_{task.image_id:04d}_{task.filter_name}.jpg",
        )
    except Exception as e:
        return ProcessingResult(
            image_id=task.image_id,
            success=False,
            elapsed=time.perf_counter() - start,
            error=str(e),
        )


def process_batch(
    tasks: list[ImageTask],
    max_workers: int | None = None,
    verbose: bool = True,
) -> list[ProcessingResult]:
    """
    Паралельно обробляє батч зображень.

    Args:
        tasks: список завдань обробки
        max_workers: кількість процесів (None = кількість ядер CPU)
        verbose: виводити прогрес
    """
    if max_workers is None:
        max_workers = multiprocessing.cpu_count()

    results: list[ProcessingResult] = []
    completed = 0

    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=max_workers) as pool:
        future_to_task = {pool.submit(apply_gaussian_blur, task): task for task in tasks}

        for future in as_completed(future_to_task):
            result = future.result()
            results.append(result)
            completed += 1

            if verbose:
                status = "✓" if result.success else "✗"
                print(
                    f"  [{status}] Image #{result.image_id:04d} "
                    f"({result.elapsed:.3f}s)  "
                    f"[{completed}/{len(tasks)}]"
                )

    total_elapsed = time.perf_counter() - start
    total_cpu_time = sum(r.elapsed for r in results)
    speedup = total_cpu_time / total_elapsed if total_elapsed > 0 else 0

    if verbose:
        successes = sum(1 for r in results if r.success)
        print(f"\n{'─' * 55}")
        print(f"Оброблено:    {successes}/{len(tasks)} зображень")
        print(f"CPU-час:      {total_cpu_time:.2f}s (сума по всіх процесах)")
        print(f"Реальний час: {total_elapsed:.2f}s")
        print(f"Прискорення:  {speedup:.1f}x  (на {max_workers} ядрах)")

    return sorted(results, key=lambda r: r.image_id)


if __name__ == "__main__":
    # Генеруємо батч з 16 завдань (різні розміри зображень)
    import random
    random.seed(42)

    tasks = [
        ImageTask(
            image_id=i,
            width=random.choice([800, 1200, 1920]),
            height=random.choice([600, 800, 1080]),
            filter_name="gaussian_blur",
        )
        for i in range(16)
    ]

    print(f"Паралельна обробка {len(tasks)} зображень")
    print(f"Доступно ядер: {multiprocessing.cpu_count()}\n")

    results = process_batch(tasks, max_workers=4)
    print(f"\nПерші 3 результати:")
    for r in results[:3]:
        print(f"  Image #{r.image_id}: {r.output_path}")

Підсумок: ключові принципи multiprocessing

if __name__ == '__main__': — завжди

Обов'язковий guard для будь-якого коду, що запускає процеси. Без нього на Windows і macOS виникне рекурсивний запуск процесів.

Pool / ProcessPoolExecutor — не Process

Для більшості задач використовуйте пул, а не вручну керовані Process. Пул управляє воркерами, обробляє виключення і масштабується автоматично.

Pickle — єдиний міст між процесами

Все, що передається між процесами, серіалізується. Lambda, відкриті файли, locks — не pickle. Тримайте аргументи простими: числа, рядки, dataclass, numpy arrays.

shared_memory для великих масивів

При роботі з великими NumPy-масивами використовуйте shared_memory — уникнете pickle-overhead при передачі гігабайтів даних між процесами.

Таблиця механізмів IPC

МеханізмМіж процесамиШвидкістьТипи данихНапрямок
QueueПомірна (pickle)Будь-які pickleОдин або кілька
Pipe2 процесиШвидша (pickle)Будь-які pickleДвосторонній
ValueДуже швидкаctypes типиСпільна пам'ять
ArrayДуже швидкаctypes масивиСпільна пам'ять
shared_memoryМаксимальнаbytes (numpy)Спільна пам'ять
ManagerПовільна (proxy)list, dict, ...Спільна пам'ять

Антипатерни, яких слід уникати

АнтипатернЧому небезпечнийРішення
Відсутність if __name__ == "__main__":Рекурсивний запуск на WindowsЗавжди додавати
Передача lambda у PoolPicklingErrorІменована функція або partial
Надто великі аргументи через QueueПовільна серіалізаціяshared_memory або файл
Забути shm.unlink()Витік ресурсів системиЗавжди cleanup у finally
Pool для I/O-bound задачНакладні витрати процесів даремніThreadPoolExecutor для I/O
Занадто дрібні задачіOverhead запуску > час задачіBatch задачі або chunksize
Ігнорувати p.exitcodeТихі збої процесівПеревіряйте exitcode != 0
Copyright © 2026