Параллельная работа в Python

Параллельное выполнение — это способ организации работы программы, при котором задачи выполняются одновременно, в то время как в последовательном выполнении они выполняются одна за другой.

В программировании — это означает, что различные части программы выполняются одновременно, чтобы повысить эффективность работы.

Разница между последовательным и параллельным выполнением:

  • Последовательное выполнение: Задачи выполняются одна за другой, последовательно. Каждая задача должна дождаться завершения предыдущей, прежде чем начать свою работу.
  • Параллельное выполнение: Задачи выполняются одновременно, независимо друг от друга. Это позволяет ускорить выполнение программы, особенно при наличии множества ядер процессора.

Примеры:

  • Последовательное выполнение: Когда вы готовите завтрак, вы сначала варите яйца, потом жарите бекон, и только после этого готовите кофе. Каждый этап выполняется поочередно.
  • Параллельное выполнение: Представьте, что у вас есть помощники, каждый из которых занимается отдельной задачей при приготовлении завтрака. Один варит яйца, второй жарит бекон, третий готовит кофе — все это происходит одновременно.

Для чего нужно использовать параллельное выполнение?

Использование параллельного выполнения важно для улучшения производительности программ, особенно в современных компьютерах с многозадачностью и многоядерными процессорами. Это позволяет более эффективно использовать ресурсы компьютера и ускоряет выполнение задач, таких как обработка данных, взаимодействие с сетью или выполнение сложных вычислений.

Существует множество сценариев применения параллельного выполнения в программировании, где эффективное использование ресурсов компьютера и ускорение выполнения задач становятся критически важными. Некоторые типичные сценарии включают:

  1. Обработка данных:
    • Пример: Обработка больших наборов данных, таких как анализ больших файлов логов или обработка больших баз данных.
  2. Сетевые операции:
    • Пример: Параллельные запросы к различным серверам или обработка множества сетевых запросов одновременно, например, веб-скрапинг или асинхронные API вызовы.
  3. Веб-разработка:
    • Пример: Обработка множества запросов от клиентов одновременно в веб-приложении, особенно в асинхронных веб-серверах.
  4. Научные вычисления:
    • Пример: Выполнение сложных математических или научных вычислений, где каждая часть задачи может быть распределена на различные ядра процессора.
  5. Графический интерфейс:
    • Пример: Параллельная обработка событий в графических интерфейсах для отзывчивости приложений.
  6. Искусственный интеллект и машинное обучение:
    • Пример: Обучение моделей машинного обучения на больших объемах данных или выполнение предсказаний для множества входных данных.
  7. Вычислительные задачи:
    • Пример: Параллельные вычисления для распределенной обработки задач, такие как распределенные вычисления в облаке.
  8. Игровая разработка:
    • Пример: Обработка событий, физики, искусственного интеллекта и отрисовки в играх, где несколько аспектов могут выполняться параллельно.
  9. Визуализация данных:
    • Пример: Параллельная обработка данных для построения графиков и визуализации больших объемов информации.
  10. Тестирование и отладка:
    • Пример: Параллельное выполнение тестовых сценариев для ускорения процесса тестирования и отладки приложений.
Содержание

Асинхронное программирование

Асинхронное программирование в Python — это способ написания кода так, чтобы ваша программа могла продолжать работу, не ожидая завершения каких-то долгих операций.

Вместо того, чтобы ждать, когда что-то произойдет, программа может заниматься другими задачами.

Основные понятия асинхронного программирования:

  1. Асинхронные функции:
    • Это функции, которые могут выполняться параллельно с другим кодом, не блокируя выполнение остальной программы. Они помогают избегать ожидания долгих операций.
      async def some_async_function():
          # код асинхронной функции
      
  2. Ключевые слова async и await:
    • Ключевое слово async используется для определения асинхронной функции, а await используется внутри таких функций для вызова других асинхронных операций без блокировки.
      async def example():
          result = await some_async_function()
      
  3. Event Loop (Цикл Событий):
    • Это своего рода «диспетчер», который следит за выполнением асинхронных задач и определяет, какие задачи должны выполняться в данный момент.
      import asyncio
      
      async def main():
          # создаем event loop
          loop = asyncio.get_event_loop()
      
          # запускаем асинхронную функцию
          await loop.create_task(some_async_function())
      
      # запускаем цикл событий
      asyncio.run(main())
      
  4. Coroutine (Корутина):
    • Это специальный вид асинхронной функции, которая может приостанавливать свое выполнение, чтобы другие задачи могли выполняться между ее шагами.
      async def coroutine_example():
          print("Шаг 1")
          await asyncio.sleep(1)
          print("Шаг 2")
      
  5. asyncio:
    • Модуль в Python, предоставляющий инструменты для работы с асинхронным кодом, включая event loop и различные вспомогательные функции.
      import asyncio
      
      async def main():
          await asyncio.gather(coroutine1(), coroutine2())
      
      asyncio.run(main())
      

Асинхронные функции и корутины

Асинхронные функции:

Асинхронные функции — это функции, которые могут выполняться асинхронно, благодаря использованию ключевых слов async и await. Они позволяют программе продолжать выполнение других задач, пока выполняется долгая операция, не блокируя остальной код.

import asyncio

async def async_function():
    print("Начало выполнения асинхронной функции")
    await asyncio.sleep(2)
    print("Окончание выполнения асинхронной функции")

# Запуск асинхронной функции
asyncio.run(async_function())

В приведенном примере asyncio.sleep(2) представляет собой асинхронную операцию ожидания, которая приостанавливает выполнение асинхронной функции на 2 секунды без блокировки других задач.

2. Корутины:

Корутины — это специфический вид асинхронных функций, которые могут приостанавливать свое выполнение, чтобы другие задачи могли выполнить свою работу. Они используют ключевое слово yield для передачи управления другим корутинам или событийному циклу.

import asyncio

async def coroutine_with_yield():
    print("Шаг 1")
    await asyncio.sleep(1)
    print("Шаг 2")

    # Пример использования yield
    yield "Промежуточный результат"

    print("Шаг 3")
    await asyncio.sleep(1)
    print("Шаг 4")

# Запуск корутины
asyncio.run(coroutine_with_yield())

В этой корутине с yield, при вызове корутины происходит выполнение первых двух шагов, затем выполнение приостанавливается, и значение "Промежуточный результат" возвращается как результат вызова yield. После этого выполнение корутины продолжается с шага 3.

Пример использования с yield полезен, когда вам нужно передать промежуточные результаты между различными этапами выполнения корутины, и она может быть особенно полезна в сценариях, где асинхронные шаги выполняются не в строгой последовательности.

Отличие между Асинхронной функцией и Корутиной:

  • Асинхронная функция: Обычная асинхронная функция, которая может содержать вызовы асинхронных операций с использованием await, но не обязана приостанавливать свое выполнение.
  • Корутина: Специальный вид асинхронной функции, которая использует yield или await для явного указания точек приостановки и передачи управления другим задачам.

Вызов других инструкций асинхронно и нет:

В асинхронной функции можно вызывать как синхронные, так и асинхронные операции. Вызов синхронной операции не блокирует выполнение остальной программы, но может приостановить текущую асинхронную функцию.

Пример вызова синхронной операции:

async def async_function():
    print("Начало выполнения асинхронной функции")
    result = sync_operation()  # Вызов синхронной операции
    print("Окончание выполнения асинхронной функции")

def sync_operation():
    print("Выполнение синхронной операции")
    return 42

Асинхронные функции могут также вызывать другие асинхронные функции или корутины с использованием ключевого слова await.

Пример вызова другой асинхронной функции:

async def async_function():
    print("Начало выполнения асинхронной функции")
    await another_async_function()  # Вызов другой асинхронной функции
    print("Окончание выполнения асинхронной функции")

async def another_async_function():
    print("Выполнение другой асинхронной функции")
    await asyncio.sleep(1)
    print("Окончание другой асинхронной функции")

В этом примере await another_async_function() приостанавливает текущую асинхронную функцию до тех пор, пока не завершится выполнение another_async_function().

Если функция объявлена как асинхронная, и внутри нее отсутствует использование ключевых слов yield или await, то все инструкции внутри функции будут выполняться последовательно. Однако это не означает, что они будут выполняться синхронно, блокируя выполнение других задач.

Асинхронные функции могут содержать синхронный код, который не использует await, и он будет выполняться как обычно, но с другими асинхронными задачами в event loop.

Пример асинхронной функции без использования await или yield:

import asyncio

async def async_function_without_await():
    print("Начало выполнения асинхронной функции")

    # Синхронная операция без await
    result = sync_operation()

    print("Окончание выполнения асинхронной функции")

def sync_operation():
    print("Выполнение синхронной операции")
    return 42

# Запуск асинхронной функции
asyncio.run(async_function_without_await())

Модуль asyncio и его возможности в асинхронном программировании

Модуль asyncio предоставляет инструменты для асинхронного программирования в Python. Он включает в себя event loop (цикл событий), асинхронные функции, и другие средства для эффективного управления параллельным выполнением задач. Давайте рассмотрим его основные возможности на примерах.

Основные понятия:

Задачи (Task) представляет собой асинхронную операцию, которая выполняется в рамках цикла событий (Event Loop).

Она представляет асинхронное выполнение определенной функции или корутины и позволяет синхронизировать их выполнение.

Цикл Event Loop.

  • Цель: Обеспечивает асинхронное выполнение задач, управление событиями и обработку ввода-вывода без блокировки основного потока.
  • Процесс при запуске Event Loop:
    1. Ожидание событий: Цикл ожидает асинхронных событий, таких как завершение задачи, приход сетевых данных и другие события.
    2. Обработка событий: Когда происходит событие, цикл запускает соответствующие задачи для обработки. Это может включать выполнение корутин, вызов колбэков и т.д.
    3. Возврат управления: После обработки событий цикл возвращается к ожиданию новых событий, не блокируя основной поток.

Очередь. 

Назначение: Очередь (Queue) в asyncio используется для обмена данными между асинхронными задачами. Это безопасный способ передачи информации и событий между корутинами.

Основные методы asyncio

  • asyncio.create_task(coro)
    • Описание: Создает задачу для выполнения асинхронной корутины coro.
    • Пример:
      import asyncio
      
      async def my_coroutine():
          await asyncio.sleep(1)
          print("Task completed")
      
      task = asyncio.create_task(my_coroutine())
      
  • asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
    • Описание: Запускает несколько корутин параллельно и ожидает их завершения. Возвращает список результатов.
    • Пример:
      import asyncio
      
      async def coro1():
          await asyncio.sleep(2)
          return "Coro1 done"
      
      async def coro2():
          await asyncio.sleep(1)
          return "Coro2 done"
      
      results = asyncio.run(asyncio.gather(coro1(), coro2()))
      print(results)
      
  • asyncio.run(main, *, debug=False)
    • Описание: Запускает асинхронную функцию main в новом цикле событий и блокирует выполнение до ее завершения.
    • Пример:
      import asyncio
      
      async def main():
          print("Hello, asyncio!")
      
      asyncio.run(main())
      
  • task.cancel():
    • Описание: Метод cancel() используется для отмены выполнения задачи. Он отправляет сигнал задаче о необходимости прервать свое выполнение.
    • Пример использования:
      import asyncio
      
      async def my_coroutine():
          try:
              while True:
                  await asyncio.sleep(1)
                  print("Working...")
          except asyncio.CancelledError:
              print("Task was cancelled")
      
      async def main():
          task = asyncio.create_task(my_coroutine())
          await asyncio.sleep(3)
          task.cancel()
          await task
      
      asyncio.run(main())
      

Возможности asyncio

1. Асинхронные функции и await:

asyncio позволяет создавать асинхронные функции, которые выполняются асинхронно, благодаря использованию ключевых слов async и await. Рассмотрим пример:

import asyncio

async def async_example():
    print("Начало выполнения асинхронной функции")
    await asyncio.sleep(2)
    print("Окончание выполнения асинхронной функции")

# Запуск асинхронной функции
asyncio.run(async_example())

В этом примере await asyncio.sleep(2) приостанавливает выполнение асинхронной функции на 2 секунды без блокировки других задач.

2. Event Loop (Цикл событий):

Event loop — это центральный компонент asyncio. Он отслеживает выполнение асинхронных задач и определяет, какие задачи должны выполняться в данный момент. Вот пример использования event loop:

import asyncio

async def task1():
    print("Выполнение задачи 1")
    await asyncio.sleep(2)
    print("Задача 1 завершена")

async def task2():
    print("Выполнение задачи 2")
    await asyncio.sleep(1)
    print("Задача 2 завершена")

# Создание и запуск event loop
async def main():
    loop = asyncio.get_event_loop()
    await asyncio.gather(task1(), task2())

asyncio.run(main())

Этот пример создает две асинхронные задачи (task1 и task2), и event loop выполняет их параллельно.

3. Очереди и обмен данными между задачами:

asyncio предоставляет очереди для безопасного обмена данными между асинхронными задачами. Пример:

import asyncio

async def producer(queue):
    for i in range(5):
        await asyncio.sleep(1)
        item = f"Элемент {i}"
        await queue.put(item)
        print(f"Произведен: {item}")

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Потреблен: {item}")
        await asyncio.sleep(2)

# Создание и запуск event loop с использованием очереди
async def main():
    queue = asyncio.Queue()
    await asyncio.gather(producer(queue), consumer(queue))

asyncio.run(main())

Здесь producer производит элементы и помещает их в очередь, а consumer потребляет элементы из очереди. Очередь гарантирует безопасность обмена данными между задачами.

4. Задачи с таймаутами:

asyncio также предоставляет средства для управления таймаутами выполнения задач. Пример:

import asyncio

async def task_with_timeout():
    try:
        await asyncio.sleep(3)
    except asyncio.CancelledError:
        print("Задача была отменена")

# Запуск асинхронной функции с таймаутом
async def main():
    try:
        await asyncio.wait_for(task_with_timeout(), timeout=2)
    except asyncio.TimeoutError:
        print("Превышено время ожидания")

asyncio.run(main())

В этом примере asyncio.wait_for устанавливает максимальное время выполнения для задачи.

Обработка ошибок и отмена задач в асинхронном коде в Python

В асинхронном коде важно эффективно обрабатывать ошибки и иметь возможность отменять задачи в случае необходимости. Как это можно сделать с использованием конструкций try, except, finally, а также функционала, предоставляемого библиотекой asyncio.

1. Обработка ошибок в асинхронной функции:

import asyncio

async def async_function_with_error():
    try:
        # Эмуляция возникновения ошибки
        raise ValueError("Произошла ошибка")
    except ValueError as e:
        print(f"Обработка ошибки: {e}")
    finally:
        print("Финальный блок, выполняющийся всегда")

# Запуск асинхронной функции с обработкой ошибок
asyncio.run(async_function_with_error())

В этом примере, если в асинхронной функции возникает ошибка типа ValueError, она будет обработана в блоке except, и затем будет выполнен блок finally.

2. Отмена задачи:

import asyncio

async def task_to_be_cancelled():
    try:
        while True:
            print("Выполнение задачи")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Задача была отменена")

# Запуск асинхронной функции и отмена задачи
async def main():
    task = asyncio.create_task(task_to_be_cancelled())
    await asyncio.sleep(3)  # Подождем 3 секунды
    task.cancel()  # Отмена задачи

asyncio.run(main())

В этом примере создается задача task_to_be_cancelled, которая выполняется в бесконечном цикле. Через 3 секунды задача отменяется с использованием метода cancel(). В случае отмены задачи блок except asyncio.CancelledError внутри задачи будет выполнен.

3. Использование контекстного менеджера для отмены задачи:

import asyncio

async def task_with_context_manager():
    try:
        async with asyncio.timeout(3):
            while True:
                print("Выполнение задачи")
                await asyncio.sleep(1)
    except asyncio.TimeoutError:
        print("Превышено время выполнения задачи")

# Запуск асинхронной функции с использованием контекстного менеджера
asyncio.run(task_with_context_manager())

В этом примере использован контекстный менеджер asyncio.timeout(3), который устанавливает таймаут выполнения задачи в 3 секунды. Если задача не завершится за отведенное время, возникнет asyncio.TimeoutError.

Задания на закрепление работы с асинхронными функциями

Задание 1. Напишите программу, которая ожидает получения числа от пользователя b и параллельно запускает две задачи печати числа b через 3 секунды.

Задание 2. Реализуйте функцию, которая асинхронно скачивает две картинки из сети параллельно и сохраняет их на диск.

Задание 3. Создайте асинхронный веб-сервер, который обрабатывает запросы на два различных URL параллельно.

Задание 4. Напишите программу, которая создает очередь из 5 асинхронных задач и выполняет их параллельно. Задачи должны печатать свой номер и текущее время.

Задание 5. Реализуйте асинхронный скрипт для парсинга информации о погоде с двух различных веб-сайтов и вывода результатов парсинга.

Задание 6. Напишите программу, которая асинхронно отправляет два HTTP-запроса на разные серверы и выводит результаты.

Задание 7. Реализуйте асинхронный скрипт, который создает две корутины, выполняющие асинхронные операции, и выводит результаты.

Задание 8. Создайте асинхронный клиент для чата, который параллельно отправляет и принимает сообщения от сервера.

Задание 9. Разработайте асинхронный скрипт, который параллельно обрабатывает данные из двух разных источников и объединяет их в один результат.

Задание 10. Напишите асинхронный скрипт для выполнения двух асинхронных операций с использованием asyncio.gather() и выводите результаты.

Решения
# Решения всех заданий

import asyncio
import aiohttp
from datetime import datetime

# Задание 1
async def print_number(b):
    await asyncio.sleep(3)
    print(f"Задание 1: Введенное число - {b}")

# Задание 2
async def download_images():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://example.com/image1.jpg') as response1, \
                session.get('https://example.com/image2.jpg') as response2:
            image1_data = await response1.read()
            image2_data = await response2.read()
            with open('image1.jpg', 'wb') as image1_file, open('image2.jpg', 'wb') as image2_file:
                image1_file.write(image1_data)
                image2_file.write(image2_data)
    print("Задание 2: Картинки успешно скачаны")

# Задание 3
from aiohttp import web

async def handle_url1(request):
    return web.Response(text="Задание 3: Обработка запроса на URL1")

async def handle_url2(request):
    return web.Response(text="Задание 3: Обработка запроса на URL2")

# Задание 4
async def task_print_number(number):
    await asyncio.sleep(1)
    print(f"Задание 4: Номер задачи - {number}, Время - {datetime.now().strftime('%H:%M:%S')}")

# Задание 5
async def parse_weather_info():
    # Реализация парсинга информации о погоде
    print("Задание 5: Информация о погоде успешно спарсена")

# Задание 6
async def send_http_requests():
    # Отправка HTTP-запросов
    print("Задание 6: HTTP-запросы успешно отправлены")

# Задание 7
async def async_operation1():
    await asyncio.sleep(2)
    print("Задание 7: Асинхронная операция 1 выполнена")

async def async_operation2():
    await asyncio.sleep(1)
    print("Задание 7: Асинхронная операция 2 выполнена")

# Задание 8
async def async_chat_client():
    # Логика асинхронного чат-клиента
    print("Задание 8: Чат-клиент запущен")

# Задание 9
async def process_data_from_sources():
    # Обработка данных из различных источников
    print("Задание 9: Данные успешно обработаны")

# Задание 10
async def perform_async_operations():
    # Выполнение асинхронных операций
    print("Задание 10: Асинхронные операции выполнены")

# Запуск всех заданий
async def main():
    b = int(input("Введите число b: "))

    # Задание 1
    await asyncio.create_task(print_number(b))

    # Задание 2
    await asyncio.create_task(download_images())

    # Задание 3
    app = web.Application()
    app.router.add_get('/url1', handle_url1)
    app.router.add_get('/url2', handle_url2)
    await web._run_app(app)

    # Задание 4
    tasks = [task_print_number(i) for i in range(5)]
    await asyncio.gather(*tasks)

    # Задание 5
    await asyncio.create_task(parse_weather_info())

    # Задание 6
    await asyncio.create_task(send_http_requests())

    # Задание 7
    await asyncio.gather(async_operation1(), async_operation2())

    # Задание 8
    await asyncio.create_task(async_chat_client())

    # Задание 9
    await asyncio.create_task(process_data_from_sources())

    # Задание 10
    await asyncio.create_task(perform_async_operations())

asyncio.run(main())
  1. В примере используется asyncio.run() для запуска асинхронной программы.
  2. Каждое задание решается внутри асинхронной функции или корутины с использованием asyncio.create_task() или asyncio.gather().
  3. Примеры включают в себя работу с асинхронными функциями, корутинами, HTTP-запросами, параллельным выполнением задач и обработкой ошибок.

Многопоточность в Python

Процессы и потоки

Процессы: В компьютерном мире «процесс» можно представить как работающую программу. Когда вы запускаете какое-то приложение на своем компьютере, это приложение становится процессом. Процессы имеют свою собственную область памяти и ресурсы, они работают независимо друг от друга, как отдельные задачи.

Потоки: Потоки — это более мелкие части процесса. Вы можете представить процесс как большую задачу, а потоки как отдельные шаги внутри этой задачи. Потоки внутри процесса могут выполняться параллельно (одновременно) или последовательно, в зависимости от того, как они настроены.

Пример: Давайте представим, что ваша задача — это приготовление обеда. Процесс — это весь процесс приготовления обеда. Потоки — это разные этапы приготовления, такие как нарезка овощей, варка макарон, жарка мяса. Если вы делите эти этапы между несколькими людьми и каждый выполняет свою часть параллельно, то это аналог потоков внутри процесса.

Ключевые идеи:

  • Процесс — это какая-то выполняющаяся программа, а потоки — это части этой программы.
  • Потоки внутри процесса могут работать одновременно или поочередно.
  • Процессы и потоки используются для эффективного управления задачами и ресурсами компьютера.
  • Потоки часто используются для параллельного выполнения задач и улучшения производительности.

Многопоточность

Многопоточность — это способ организации выполнения программы, при котором она может выполнять несколько задач одновременно, словно имеет несколько параллельных «потоков» выполнения.

В программировании многопоточные программы могут одновременно выполнять несколько частей задачи, что улучшает эффективность и скорость выполнения.

Преимущества многопоточности:

  1. Повышение производительности:
    • Многопоточные программы могут эффективно использовать ресурсы компьютера, такие как процессорное время, что приводит к увеличению общей производительности.
  2. Отзывчивость:
    • Многопоточные приложения могут реагировать на внешние события и продолжать выполнение других задач в то время, как один из потоков может быть занят выполнением долгих операций.
  3. Масштабируемость:
    • Многопоточные приложения легко масштабируются на многоядерных процессорах, что позволяет им лучше использовать вычислительные ресурсы.
  4. Упрощение структуры кода:
    • Некоторые задачи естественным образом разбиваются на параллельные подзадачи, что делает структуру кода более четкой и понятной.

Ограничения многопоточности:

  1. Гонки за данные (Race Conditions):
    • Если несколько потоков пытаются одновременно изменить общие данные, это может привести к гонкам за данные и неопределенному поведению программы.
  2. Взаимоблокировки (Deadlocks):
    • Взаимоблокировка возникает, когда несколько потоков ожидают ресурсы, которые удерживают другие потоки, что может привести к застою выполнения программы.
  3. Усложненная отладка:
    • Поиск и устранение ошибок в многопоточных программах может быть сложным из-за возможных взаимодействий между потоками.
  4. Неоднозначность порядка выполнения:
    • Порядок выполнения потоков может быть неопределенным, что делает предсказание поведения программы сложным.
  5. Переключение контекста: Когда операционная система управляет выполнением нескольких потоков, она должна переключаться между ними. Этот процесс называется «переключением контекста». Переключение контекста требует времени и вычислительных ресурсов. Когда у вас есть много мелких задач, которые постоянно переключаются, это может привести к излишним накладным расходам, снижая общую производительность.
    Например: Допустим, у вас есть тысячи потоков, каждый из которых выполняет очень короткое задание. Затраты на переключение контекста между этими потоками могут быть значительными по сравнению с реальным выполнением самих задач.
  6. Сложности с синхронизацией: Когда несколько потоков имеют доступ к общим данным, возникают проблемы с синхронизацией. Например, если один поток изменяет общую переменную, а другой пытается ее прочитать или изменить в то же время, может возникнуть ошибка, такая как гонка за данными.Пример: Рассмотрим следующий код на Python, где два потока пытаются увеличить общую переменную counter:
    import threading
    
    counter = 0
    
    def increment():
        global counter
        for _ in range(1000000):
            counter += 1
    
    # Создание двух потоков
    thread1 = threading.Thread(target=increment)
    thread2 = threading.Thread(target=increment)
    
    # Запуск потоков
    thread1.start()
    thread2.start()
    
    # Ожидание завершения потоков
    thread1.join()
    thread2.join()
    
    print("Результат:", counter)
    

В этом примере результат может быть неопределенным из-за гонки за данными. Для правильной синхронизации нужно использовать механизмы, такие как блокировки, чтобы гарантировать правильное выполнение операций над общими данными.

Работа с потоками в Python

В Python существует несколько способов создания и работы с потоками. Давайте рассмотрим два основных подхода: использование модуля threading и использование модуля concurrent.futures.

1. Использование модуля threading:

Модуль threading предоставляет базовые средства для создания и управления потоками.

Создание потока:

import threading

def my_function():
    for i in range(5):
        print(f"Выполнение в потоке: {i}")

# Создание объекта потока
my_thread = threading.Thread(target=my_function)

# Запуск потока
my_thread.start()

# Ожидание завершения потока (необязательно)
my_thread.join()

print("Основной поток завершился")

2. Использование модуля concurrent.futures:

Модуль concurrent.futures предоставляет абстракции для параллельного выполнения кода, включая потоки.

Создание потока с помощью ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def my_function(i):
    print(f"Выполнение в потоке: {i}")

# Создание объекта ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
    # Запуск потоков
    results = [executor.submit(my_function, i) for i in range(5)]

# Обработка результатов (если необходимо)
for future in results:
    result = future.result()
    print(f"Результат: {result}")

print("Основной поток завершился")

Оба подхода предоставляют удобные средства для создания и управления потоками в Python. Выбор конкретного метода зависит от конкретных требований вашего приложения. Обратите внимание, что в Python из-за Global Interpreter Lock (GIL) использование потоков может не всегда приводить к параллельному выполнению кода. Для многозадачных вычислений или операций ввода-вывода, рекомендуется использовать модуль concurrent.futures и его класс ThreadPoolExecutor или ProcessPoolExecutor.

GIL и ограничения многопоточности

GIL, или Global Interpreter Lock (Глобальная блокировка интерпретатора), представляет собой механизм внутри интерпретатора CPython (стандартной реализации Python), который ограничивает выполнение кода Python в одном потоке в каждый момент времени.

Это означает, что в рамках одного процесса Python, вне зависимости от количества созданных потоков, только один поток может активно выполняться.

Почему используется GIL: GIL был введен для обеспечения простоты реализации интерпретатора CPython и избежания проблем, связанных с потокобезопасностью в многозадачной среде. Он обеспечивает уровень атомарности для операций инкремента и декремента, что делает код интерпретатора более простым в обслуживании.

Ограничения GIL:

  1. Параллельные вычисления в Python:
    • Из-за GIL параллельные вычисления в рамках одного процесса Python могут замедляться. Даже если у вас есть многоядерный процессор, только один поток может активно выполняться, что может снижать общую производительность в некоторых сценариях.
  2. Потеря выигрыша от многозадачности:
    • В случаях, когда ваш код в основном выполняет вычислительные задачи, а не ожидает ввода-вывода (I/O), использование многих потоков может не приносить значительного выигрыша в производительности из-за GIL.
  3. Неудобства при использовании многозадачности:
    • Из-за GIL в Python многопоточное программирование может быть более сложным и требовательным к синхронизации, так как несколько потоков не могут безопасно изменять общие данные.

Рекомендации для обхода GIL:

  1. Использование многопроцессорности:
    • Вместо использования многопоточности рекомендуется использовать многопроцессорность с помощью модуля multiprocessing. Каждый процесс имеет свой собственный интерпретатор и, следовательно, собственный GIL.
  2. Использование асинхронного программирования:
    • Для ввода-вывода (I/O) и сетевых операций можно использовать асинхронное программирование с использованием модуля asyncio. Асинхронные программы обходят GIL, позволяя выполнять другие задачи во время блокирующих операций.
  3. Использование Jython или IronPython:
    • Jython и IronPython — это реализации Python, работающие на платформах Java и .NET соответственно. Они не используют GIL, так как исполняются на виртуальных машинах Java и .NET, которые предоставляют свои механизмы управления потоками.

Многопоточность или многопроцессность?

  1. Использование потоков:
    • Синхронизация: Потоки могут эффективно совместно использовать общую память и данные. Однако это требует синхронизации для предотвращения конфликтов, таких как гонки за данными, что может быть сложно и подвержено ошибкам.
    • Взаимодействие: Потоки легко обмениваются данными напрямую, так как они разделяют общую память. Это делает взаимодействие между потоками более прямым и простым.
    • Скорость создания: Создание потоков обычно более быстрое и требует меньше ресурсов, чем создание процессов.
    • Основное применение: Потоки подходят для задач, где требуется эффективное совместное использование памяти и данные должны быть быстро переданы между задачами.
  2. Использование процессов:
    • Изоляция: Процессы имеют собственные области памяти, что гарантирует изоляцию данных. Это устраняет проблемы гонок за данными, но требует механизмов для обмена информацией между процессами.
    • Сложность взаимодействия: Взаимодействие между процессами более сложное, чем между потоками, потому что требуется использование механизмов межпроцессорного взаимодействия (IPC).
    • Надежность: Из-за изоляции процессы более устойчивы к сбоям. Если один процесс завершается с ошибкой, это не влияет на другие процессы.
    • Основное применение: Процессы подходят для задач, где изоляция данных критически важна, и требуется высокая степень надежности.

Выбор в зависимости от задачи:

  • Вычисления:
    • Потоки: Для задач, требующих интенсивных вычислений и эффективного использования ресурсов процессора, потому что они делят общую память и могут работать параллельно.
    • Процессы: Если задача распределена на разные подзадачи, каждая из которых может выполняться в отдельном процессе.
  • Изоляция данных:
    • Потоки: Когда изоляция данных не критична, и цель — увеличить эффективность взаимодействия между задачами.
    • Процессы: Когда данные должны быть строго изолированы для предотвращения возможных конфликтов.
  • Надежность:
    • Потоки: В более простых случаях, когда один поток не влияет на работу других, и в случаях асинхронного программирования с использованием asyncio.
    • Процессы: Когда надежность критична, и сбой одного компонента не должен повлиять на другие.
  • Ввод-вывод (I/O):
    • Потоки: Для задач с блокирующими операциями ввода-вывода, где потоки могут продолжать работу во время ожидания завершения операции.
    • Процессы: Если используется многозадачность для обработки блокирующих I/O-операций, лучше использовать асинхронное программирование с потоками или процессами, поддерживающими асинхронность.

Задания на закрепление работы с потоками

Задание 11. Напишите программу, которая создает два потока для выполнения двух задач параллельно: один поток печатает числа от 1 до 5, а другой поток печатает их квадраты.

Задание 12. Реализуйте программу, использующую многопоточность, для загрузки изображений из интернета параллельно. Каждый поток загружает свое изображение и сохраняет его на диск.

Задание 13. Напишите программу, в которой создается поток для генерации случайных чисел и поток для вычисления их среднего значения. Используйте механизм синхронизации для безопасного доступа к общим данным.

Задание 14. Реализуйте программу для параллельного поиска ключевого слова в текстовых файлах в указанном каталоге. Каждый поток обрабатывает отдельный файл.

Задание 15. Создайте программу, в которой три потока выполняют различные вычислительные задачи, а главный поток ожидает их завершения и выводит результаты.

Решение
import threading
import random
import os

# Задание 11
def print_numbers():
    for i in range(1, 6):
        print(f"Задание 11: Число: {i}")

def print_squares():
    for i in range(1, 6):
        print(f"Задание 11: Квадрат числа {i}: {i*i}")

thread_numbers = threading.Thread(target=print_numbers)
thread_squares = threading.Thread(target=print_squares)

# Запуск потоков
thread_numbers.start()
thread_squares.start()

# Ожидание завершения потоков
thread_numbers.join()
thread_squares.join()

# Задание 12
import requests
from io import BytesIO
from PIL import Image

def download_image(url, filename):
    response = requests.get(url)
    image = Image.open(BytesIO(response.content))
    image.save(filename)

urls = ["https://example.com/image1.jpg", "https://example.com/image2.jpg"]
filenames = ["image1.jpg", "image2.jpg"]

threads = [threading.Thread(target=download_image, args=(url, filename)) for url, filename in zip(urls, filenames)]

# Запуск потоков
for thread in threads:
    thread.start()

# Ожидание завершения потоков
for thread in threads:
    thread.join()

# Задание 13
class RandomNumberGenerator:
    def __init__(self):
        self.generated_numbers = []
        self.lock = threading.Lock()

    def generate_numbers(self):
        for _ in range(5):
            with self.lock:
                number = random.randint(1, 10)
                self.generated_numbers.append(number)
                print(f"Задание 13: Сгенерировано число: {number}")

    def calculate_average(self):
        with self.lock:
            average = sum(self.generated_numbers) / len(self.generated_numbers)
            print(f"Задание 13: Среднее значение чисел: {average}")

rng = RandomNumberGenerator()

# Создание потоков
thread_generate = threading.Thread(target=rng.generate_numbers)
thread_calculate = threading.Thread(target=rng.calculate_average)

# Запуск потоков
thread_generate.start()
thread_calculate.start()

# Ожидание завершения потоков
thread_generate.join()
thread_calculate.join()

# Задание 14
def search_keyword(file_path, keyword):
    with open(file_path, 'r') as file:
        content = file.read()
        if keyword in content:
            print(f"Задание 14: Файл {file_path} содержит ключевое слово: {keyword}")

directory = "/path/to/files"
keyword = "python"

files = [os.path.join(directory, file) for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
threads_search = [threading.Thread(target=search_keyword, args=(file, keyword)) for file in files]

# Запуск потоков
for thread in threads_search:
    thread.start()

# Ожидание завершения потоков
for thread in threads_search:
    thread.join()

# Задание 15
def perform_computation(task_id):
    result = task_id * 2
    print(f"Задание 15: Вычисление задачи {task_id}: Результат - {result}")

tasks = [1, 2, 3]
threads_computation = [threading.Thread(target=perform_computation, args=(task,)) for task in tasks]

# Запуск потоков
for thread in threads_computation:
    thread.start()

# Ожидание завершения потоков
for thread in threads_computation:
    thread.join()

Многопроцессность

Многопроцессность — это способ организации выполнения программы, при котором она может одновременно выполняться в нескольких процессах. Процессы представляют собой отдельные экземпляры программы, которые имеют свои собственные ресурсы, адресное пространство и потоки выполнения.

Преимущества многопроцессности:

  1. Изолированность:
    • Каждый процесс имеет свою собственную область памяти, что обеспечивает изоляцию данных между процессами. Это делает многопроцессорные программы более устойчивыми к ошибкам и безопасными.
  2. Надежность:
    • Ошибка в одном процессе не повлияет на работу других процессов. Если один процесс завершается с ошибкой, это не оказывает воздействия на остальные процессы.
  3. Масштабируемость:
    • Многопроцессорные программы могут легко масштабироваться на многоядерных процессорах, так как каждый процесс может быть назначен на свой собственный ядро.
  4. Параллелизм:
    • В многопроцессорных программах каждый процесс может выполняться параллельно, что улучшает общую производительность, особенно на многоядерных системах.
  5. Управление ресурсами:
    • Процессы позволяют эффективно управлять выделением ресурсов, таких как CPU, память и файловые дескрипторы.
  6. Способ обхода GIL:
    • В отличие от многопоточности в Python, где действует GIL, использование многопроцессорности позволяет обойти GIL и эффективно использовать многоядерные системы.

Пример использования многопроцессности:

Представим, у вас есть веб-сервер, который обслуживает запросы от клиентов. Каждый запрос может быть обработан отдельным процессом, обеспечивая изоляцию данных и обеспечивая высокую надежность и производительность.

Реализация мультипроцессности в Python

В Python мультипроцессность может быть реализована с использованием модуля multiprocessing. Этот модуль предоставляет инструменты для создания и управления процессами, а также обмена данными между ними. Давайте рассмотрим основные шаги для реализации мультипроцессности в Python:

  1. Импортировать модуль multiprocessing
  2. Определить функцию, которую будет выполнять каждый процесс
  3. Создать объекты процессов
  4. Запустить процессы
  5. Дождаться завершения процессов

Реализация на Python:

import multiprocessing

def worker_function(arg):
    print(f"Процесс {arg} выполняется")

if __name__ == "__main__":
    # Создание объектов процессов
    process1 = multiprocessing.Process(target=worker_function, args=("первый",))
    process2 = multiprocessing.Process(target=worker_function, args=("второй",))

    # Запуск процессов
    process1.start()
    process2.start()

    # Дожидаемся завершения процессов
    process1.join()
    process2.join()

Обратите внимание, что условие if __name__ == "__main__": используется для защиты кода от выполнения при импорте модуля, что может вызвать проблемы с мультипроцессингом в некоторых средах.

Приёмы работы с модулем multiprocessing

Использование очереди (Queue):

Модуль multiprocessing предоставляет класс Queue, который обеспечивает безопасную передачу данных между процессами.

Пример:

import multiprocessing

def worker_function(queue):
    data = queue.get()  # Получение данных из очереди
    print(f"Процесс получил данные: {data}")

if __name__ == "__main__":
    my_queue = multiprocessing.Queue()

    process = multiprocessing.Process(target=worker_function, args=(my_queue,))
    
    data_to_send = "Привет, процесс!"
    my_queue.put(data_to_send)  # Помещение данных в очередь

    process.start()
    process.join()

В этом примере создается процесс, который принимает данные из очереди. Данные помещаются в очередь перед запуском процесса, и процесс затем получает эти данные из очереди.

Использование общих объектов (Value, Array):

Value и Array из модуля multiprocessing позволяют создавать общие объекты для обмена данными между процессами.

Пример с использованием Value:

import multiprocessing

def worker_function(shared_value):
    shared_value.value += 1
    print(f"Процесс увеличил значение: {shared_value.value}")

if __name__ == "__main__":
    my_shared_value = multiprocessing.Value("i", 0)

    process = multiprocessing.Process(target=worker_function, args=(my_shared_value,))
    
    process.start()
    process.join()

    print(f"Основной процесс: значение после процесса - {my_shared_value.value}")

В этом примере создается общее целочисленное значение (Value) и передается в процесс. Процесс увеличивает это значение, и результат виден в основном процессе после завершения процесса.

Пример с использованием Array:

import multiprocessing

def worker_function(shared_array):
    for i in range(len(shared_array)):
        shared_array[i] += 1
    print(f"Процесс увеличил массив: {shared_array[:]}")


if __name__ == "__main__":
    my_shared_array = multiprocessing.Array("i", [1, 2, 3, 4, 5])

    process = multiprocessing.Process(target=worker_function, args=(my_shared_array,))
    
    process.start()
    process.join()

    print(f"Основной процесс: массив после процесса - {my_shared_array[:]}")

В этом примере создается общий массив (Array) и передается в процесс. Процесс увеличивает каждый элемент массива, и результат виден в основном процессе после завершения процесса.

Использование блокировки (Lock):

Lock из модуля multiprocessing позволяет предотвратить одновременный доступ к общим ресурсам из нескольких процессов.

Пример:

import multiprocessing

def worker_function(shared_value, lock):
    with lock:
        shared_value.value += 1
        print(f"Процесс увеличил значение: {shared_value.value}")

if __name__ == "__main__":
    my_shared_value = multiprocessing.Value("i", 0)
    my_lock = multiprocessing.Lock()

    process1 = multiprocessing.Process(target=worker_function, args=(my_shared_value, my_lock))
    process2 = multiprocessing.Process(target=worker_function, args=(my_shared_value, my_lock))
    
    process1.start()
    process2.start()

    process1.join()
    process2.join()

    print(f"Основной процесс: значение после процессов - {my_shared_value.value}")

В этом примере создается общее значение (Value) и блокировка (Lock). Два процесса используют блокировку для обеспечения безопасного изменения значения. Блокировка предотвращает конфликты доступа к общему ресурсу и обеспечивает корректное выполнение программы.

Работа с пулом процессов

Пул процессов — это механизм в модуле multiprocessing, который предоставляет возможность создавать пул процессов для параллельного выполнения задач. Пул процессов управляет пулом фиксированного размера, и каждая задача, которую вы отправляете на выполнение, обрабатывается одним из процессов в пуле.

Лучшие практики при работе с пулом процессов:

  1. Использование контекстного менеджера:
    • Всегда используйте конструкцию with для управления пулом процессов. Это гарантирует правильное завершение работы пула после выполнения всех задач.
  2. Организация данных:
    • Если у вас есть данные, которые нужно передать каждому процессу, используйте multiprocessing.Manager или multiprocessing.Queue для безопасного обмена данными между процессами.
  3. Обработка исключений:
    • Обрабатывайте исключения ваших задач, чтобы избежать зависания пула процессов из-за необработанных ошибок.
  4. Контроль размера пула:
    • Выбирайте размер пула в зависимости от количества доступных ядер CPU и характера задач. Например, multiprocessing.cpu_count() даст вам количество доступных ядер.
  5. Избегайте разделяемых изменяемых объектов:
    • При использовании пула процессов избегайте изменяемых объектов, которые могут быть изменены несколькими процессами одновременно. Это может привести к неопределенному поведению.

Пример с использованием пула процессов:

import multiprocessing

def worker_function(x):
    result = x * x
    print(f"Результат: {result}")
    return result

if __name__ == "__main__":
    with multiprocessing.Pool(processes=2) as pool:
        # Пример передачи данных каждой задаче
        data_to_process = [1, 2, 3, 4, 5]
        
        # Применение функции к каждому элементу списка в параллельном режиме
        results = pool.map(worker_function, data_to_process)
        
        # Пример использования apply_async для асинхронного выполнения
        async_result = pool.apply_async(worker_function, (10,))

        # Ожидание завершения всех задач
        pool.close()
        pool.join()

        # Получение результатов
        print("Результаты:", results)
        print("Асинхронный результат:", async_result.get())

В этом примере создается пул процессов с помощью multiprocessing.Pool. Затем функция worker_function применяется к каждому элементу списка data_to_process в параллельном режиме с использованием pool.map(). Также демонстрируется асинхронное выполнение с использованием pool.apply_async(). В конце, вызываются методы pool.close() и pool.join() для корректного завершения работы пула.

Примеры реализации практических задач с помощью мультипроцессинга

Пример 1: Вычисление факториала с использованием мультипроцессинга

import multiprocessing
from functools import reduce

def partial_factorial(start, end):
    # Рассчитываем часть факториала для диапазона от start до end
    return reduce(lambda x, y: x * y, range(start, end + 1))

def parallel_factorial(n, num_processes):
    chunk_size = n // num_processes
    processes = []

    # Создаем процессы для расчета частей факториала
    for i in range(num_processes):
        start = i * chunk_size + 1
        end = (i + 1) * chunk_size if i != num_processes - 1 else n
        process = multiprocessing.Process(target=partial_factorial, args=(start, end))
        processes.append(process)
        process.start()

    # Ждем завершения всех процессов
    for process in processes:
        process.join()

    # Объединяем результаты частичных факториалов
    result = reduce(lambda x, y: x * y, [process.exitcode for process in processes])
    return result

if __name__ == "__main__":
    number = 10
    num_processes = 4

    result = parallel_factorial(number, num_processes)
    print(f"The factorial of {number} is {result}")

Пример 2: Поиск простых чисел в диапазоне

import multiprocessing
import math

def is_prime(num):
    # Проверяем, является ли число простым
    if num < 2:
        return False
    for i in range(2, int(math.sqrt(num)) + 1):
        if num % i == 0:
            return False
    return True

def find_primes(start, end):
    # Находим простые числа в заданном диапазоне
    primes = [num for num in range(start, end + 1) if is_prime(num)]
    return primes

def parallel_prime_search(start, end, num_processes):
    chunk_size = (end - start + 1) // num_processes
    processes = []

    # Создаем процессы для поиска простых чисел
    for i in range(num_processes):
        sub_start = start + i * chunk_size
        sub_end = start + (i + 1) * chunk_size - 1 if i != num_processes - 1 else end
        process = multiprocessing.Process(target=find_primes, args=(sub_start, sub_end))
        processes.append(process)
        process.start()

    # Ждем завершения всех процессов
    for process in processes:
        process.join()

    # Собираем результаты
    result = []
    for process in processes:
        result.extend(process.exitcode)

    return result

if __name__ == "__main__":
    start_range = 10
    end_range = 50
    num_processes = 2

    prime_numbers = parallel_prime_search(start_range, end_range, num_processes)
    print(f"Prime numbers in the range {start_range} to {end_range}: {prime_numbers}")

Пример 3: Асинхронная загрузка веб-страниц

import multiprocessing
import requests
from bs4 import BeautifulSoup

def download_and_parse(url):
    # Загружаем и парсим веб-страницу
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    title = soup.title.string
    print(f"Title of {url}: {title}")

def parallel_web_scraping(urls, num_processes):
    chunk_size = len(urls) // num_processes
    processes = []

    # Создаем процессы для загрузки и парсинга веб-страниц
    for i in range(num_processes):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i != num_processes - 1 else len(urls)
        process = multiprocessing.Process(target=download_and_parse, args=(urls[start:end],))
        processes.append(process)
        process.start()

    # Ждем завершения всех процессов
    for process in processes:
        process.join()

if __name__ == "__main__":
    websites = ["https://www.example1.com", "https://www.example2.com", "https://www.example3.com"]
    num_processes = 3

    parallel_web_scraping(websites, num_processes)

Задания на закрепление

Задание 16: Параллельное вычисление чисел Фибоначчи

Задание 17: Многопоточный поиск простых чисел в диапазоне

Задание 18: Асинхронная загрузка изображений с использованием мультипроцессинга

Задание 19: Параллельная сортировка списка

Задание 20: Распределенный подсчет слов в тексте

Решения
# Задание 16: Параллельное вычисление чисел Фибоначчи
import multiprocessing

def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n-1) + fibonacci(n-2)

def parallel_fibonacci(numbers, num_processes):
    # Создаем пул процессов
    pool = multiprocessing.Pool(processes=num_processes)
    # Применяем функцию fibonacci к каждому числу в списке параллельно
    results = pool.map(fibonacci, numbers)
    # Закрываем пул и ожидаем завершения всех процессов
    pool.close()
    pool.join()
    return results

if __name__ == "__main__":
    # Тестируем параллельное вычисление чисел Фибоначчи
    fibonacci_numbers = [10, 15, 20]
    num_processes = 3

    fibonacci_results = parallel_fibonacci(fibonacci_numbers, num_processes)
    print(f"Fibonacci results: {fibonacci_results}")


# Задание 17: Многопоточный поиск простых чисел в диапазоне
import multiprocessing
import math

def is_prime(num):
    if num < 2:
        return False
    for i in range(2, int(math.sqrt(num)) + 1):
        if num % i == 0:
            return False
    return True

def find_primes(start, end):
    # Находим простые числа в заданном диапазоне
    primes = [num for num in range(start, end + 1) if is_prime(num)]
    return primes

def parallel_prime_search(start, end, num_processes):
    # Разбиваем диапазон на части и обрабатываем их параллельно
    chunk_size = (end - start + 1) // num_processes
    processes = []

    for i in range(num_processes):
        sub_start = start + i * chunk_size
        sub_end = start + (i + 1) * chunk_size - 1 if i != num_processes - 1 else end
        process = multiprocessing.Process(target=find_primes, args=(sub_start, sub_end))
        processes.append(process)
        process.start()

    # Ожидаем завершения всех процессов
    for process in processes:
        process.join()

    # Собираем результаты из каждого процесса
    result = []
    for process in processes:
        result.extend(process.exitcode)

    return result

if __name__ == "__main__":
    # Тестируем многопоточный поиск простых чисел
    start_range = 10
    end_range = 50
    num_processes = 2

    prime_numbers = parallel_prime_search(start_range, end_range, num_processes)
    print(f"Prime numbers in the range {start_range} to {end_range}: {prime_numbers}")
# Задание 18: Асинхронная загрузка изображений с использованием мультипроцессинга
import multiprocessing
import requests
from PIL import Image
from io import BytesIO

def download_and_resize_image(url, size=(100, 100)):
    # Загружаем изображение по URL и изменяем его размер
    response = requests.get(url)
    image = Image.open(BytesIO(response.content))
    resized_image = image.resize(size)
    return resized_image

def parallel_image_processing(image_urls, num_processes):
    # Создаем пул процессов
    pool = multiprocessing.Pool(processes=num_processes)
    # Применяем функцию download_and_resize_image к каждому URL-адресу изображения параллельно
    resized_images = pool.map(download_and_resize_image, image_urls)
    # Закрываем пул и ожидаем завершения всех процессов
    pool.close()
    pool.join()
    return resized_images

if __name__ == "__main__":
    # Тестируем асинхронную загрузку и изменение размера изображений
    image_urls = ["url1.jpg", "url2.jpg", "url3.jpg"]
    num_processes = 3

    resized_images = parallel_image_processing(image_urls, num_processes)
    for i, image in enumerate(resized_images):
        image.save(f"resized_image_{i}.jpg")


# Задание 19: Параллельная сортировка списка
import multiprocessing

def parallel_sort(data):
    # Разбиваем список на части и сортируем их параллельно
    chunk_size = len(data) // num_processes
    processes = []

    for i in range(num_processes):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i != num_processes - 1 else len(data)
        process = multiprocessing.Process(target=sorted, args=(data[start:end],))
        processes.append(process)
        process.start()

    # Ожидаем завершения всех процессов
    for process in processes:
        process.join()

    # Собираем и сортируем результаты из каждого процесса
    result = sorted([process.exitcode for process in processes])
    return result

if __name__ == "__main__":
    # Тестируем параллельную сортировку списка
    data_to_sort = [4, 2, 7, 1, 9, 5, 3, 8, 6]
    num_processes = 3

    sorted_data = parallel_sort(data_to_sort)
    print(f"Sorted data: {sorted_data}")
# Задание 20: Распределенный подсчет слов в тексте
import multiprocessing

def count_words(text):
    # Функция для подсчета слов в тексте
    word_count = {}
    words = text.split()

    for word in words:
        word = word.lower().strip(".,?!")
        if word:
            if word in word_count:
                word_count[word] += 1
            else:
                word_count[word] = 1

    return word_count

def parallel_word_count(texts, num_processes):
    # Создаем пул процессов
    pool = multiprocessing.Pool(processes=num_processes)
    # Применяем функцию count_words к каждому тексту параллельно
    word_counts = pool.map(count_words, texts)
    # Закрываем пул и ожидаем завершения всех процессов
    pool.close()
    pool.join()

    # Комбинируем результаты из каждого процесса
    combined_word_count = {}
    for wc in word_counts:
        for word, count in wc.items():
            if word in combined_word_count:
                combined_word_count[word] += count
            else:
                combined_word_count[word] = count

    return combined_word_count

if __name__ == "__main__":
    # Тестируем распределенный подсчет слов в текстах
    texts_to_count = ["Lorem ipsum dolor sit amet.", "Consectetur adipiscing elit.", "Lorem ipsum dolor."]
    num_processes = 2

    total_word_count = parallel_word_count(texts_to_count, num_processes)
    print(f"Total word count: {total_word_count}")

Выбор реализации: асинхронные функции или многопоточность или мультипроцессинг?

В каких ситуациях что подойдет?

  1. Асинхронное программирование:
    • Когда использовать: Асинхронное программирование подходит для задач, где блокировка ввода-вывода (I/O) является основной причиной замедления. Такие задачи включают в себя сетевые запросы, обработку файлов, ввод-вывод баз данных.
    • Примеры сценариев: Веб-сервера, асинхронные фреймворки веб-приложений, сетевые приложения, обработка большого числа одновременных соединений.
  2. Многопоточность:
    • Когда использовать: Многопоточность подходит для задач, где задержки связаны с вычислительными операциями или задачами, которые можно распараллелить. В этом случае, многопоточность может использоваться для эффективного распределения вычислительных задач по разным ядрам процессора.
      Параллельная обработка данных: При работе с обширными объемами данных, которые могут быть обработаны независимо друг от друга, потоки могут обеспечить параллельную обработку данных, ускоряя выполнение программы.Работа с библиотеками, не поддерживающими асинхронность: Некоторые библиотеки или сторонние модули могут быть написаны с использованием синхронного кода и не поддерживать асинхронные интерфейсы. В таких случаях многопоточность может быть предпочтительным вариантом.Простота использования существующего кода: Если у вас есть существующий код, написанный с использованием потоков, и его сложно или нецелесообразно адаптировать для асинхронного стиля, то использование потоков может быть более удобным.
    • Примеры сценариев: Вычислительно интенсивные задачи, обработка данных в реальном времени, многозадачные приложения. В Python присутствует Global Interpreter Lock (GIL), который ограничивает одновременное выполнение нескольких потоков в одном процессе. Это означает, что даже если у вас есть несколько потоков, они не могут параллельно выполнять CPU-интенсивные операции из-за блокировки GIL.
  3. Мультипроцессинг:
    • Когда использовать: Мультипроцессинг подходит для задач, где блокировка GIL в Python ограничивает эффективность многопоточности. Также это подходит для параллельного выполнения независимых задач, которые не зависят от общего состояния.
    • Примеры сценариев: Параллельная обработка данных которая требует значительных вычислительных ресурсов, параллельные задачи, которые не имеют общих ресурсов.

Примеры задач для различных способов параллельности:

Асинхронные функции:

  1. Сетевые запросы:
    • Описание: Например, параллельные запросы к нескольким веб-сервисам для получения данных.
    • Преимущества асинхронности: Асинхронные функции позволяют эффективно использовать время ожидания ответа от сетевых запросов, не блокируя выполнение остального кода.
  2. Сервер событий:
    • Описание: Разработка простого сервера, который обрабатывает асинхронные события, такие как подключение нового клиента или получение сообщения.
    • Преимущества асинхронности: Асинхронные функции позволяют обрабатывать множество событий одновременно без необходимости выделять отдельный поток для каждого подключения.

Потоки:

  1. Обработка данных в реальном времени:
    • Описание: Вычисление и обновление данных в реальном времени, например, анимации в графическом интерфейсе.
    • Преимущества многопоточности: Потоки могут параллельно выполнять вычисления, что полезно для обработки данных, которые могут быть разделены на независимые части.
  2. Многозадачные приложения:
    • Описание: Разработка приложения, включающего одновременное выполнение нескольких задач, таких как загрузка данных, отображение интерфейса и обновление статусов.
    • Преимущества многопоточности: Потоки могут быть использованы для обработки различных аспектов приложения параллельно.

Процессы:

  1. Параллельная обработка данных:
    • Описание: Выполнение интенсивных вычислений на больших объемах данных, например, обработка изображений или научные расчеты.
    • Преимущества мультипроцессинга: Процессы обходят ограничения GIL, что делает мультипроцессинг эффективным для CPU-интенсивных вычислений.
  2. Параллельная обработка нескольких файлов:
    • Описание: Обработка нескольких файлов независимо друг от друга, например, конвертация изображений в различные форматы.
    • Преимущества мультипроцессинга: Каждый процесс может работать с отдельным файлом параллельно, ускоряя выполнение задач.

Алгоритм выбора метода параллельного программирования:

  1. Тип задачи:
    • Вычислительно интенсивные вычисления:
      • Рекомендация: Мультипроцессинг.
      • Пояснение: Для задач, требующих максимального использования CPU, мультипроцессинг эффективнее, так как каждый процесс обходит GIL и может параллельно использовать разные ядра процессора.
    • Сетевые операции, ввод-вывод:
      • Рекомендация: Асинхронное программирование.
      • Пояснение: Асинхронные функции обеспечивают эффективное использование времени в ожидании сетевых операций и ввода-вывода, предотвращая блокировку потоков.
    • Обработка событий и взаимодействие с пользовательским интерфейсом:
      • Рекомендация: Асинхронное программирование или потоки.
      • Пояснение: Асинхронность может быть полезной для обработки событий и взаимодействия с пользовательским интерфейсом. Потоки также могут быть использованы, если асинхронные библиотеки не подходят.
  2. Способ обмена данными:
    • Общие ресурсы с блокировкой:
      • Рекомендация: Мультипроцессинг или многопоточность с использованием блокировок.
      • Пояснение: Обеспечьте безопасность доступа к общим ресурсам, используя блокировки при многопоточности или мультипроцессинге с разделяемыми объектами.
    • Обмен сообщениями между процессами/потоками:
      • Рекомендация: Использование очередей (Queue) или асинхронных каналов связи.
      • Пояснение: Механизмы обмена сообщениями обеспечивают безопасный способ передачи данных между параллельными частями программы.
  3. Уровень сложности кода и поддержка:
    • Простота использования:
      • Рекомендация: Асинхронное программирование или Thread Pool/Process Pool.
      • Пояснение: Если простота кода важна, асинхронные функции или пулы потоков/процессов могут предоставить более простой способ управления параллельностью.
    • Сложность синхронизации и избегание гонок данных:
      • Рекомендация: Использование высокоуровневых библиотек (например, asyncio) и средств синхронизации.
      • Пояснение: Выбор инструментов, предоставляющих абстракции и механизмы синхронизации, может снизить сложность кода и предотвратить гонки данных.

Лучшие практики при работе с параллельно выполняющимися задачами.

  1. Используйте блокировки (Locks): В Python, блокировки (locks) являются стандартным механизмом синхронизации для предотвращения конфликтов доступа к общим ресурсам. Перед выполнением операции с общим ресурсом, поток или процесс может захватить блокировку, а затем освободить ее после завершения операции.Пример использования блокировок:
    from threading import Lock
    
    my_lock = Lock()
    
    def safe_update(shared_data, value):
        with my_lock:
            shared_data += value
    
  2. Используйте механизмы Queue: В многопоточном или мультипроцессорном коде использование очередей (Queue) может предотвратить гонки данных. Очереди обеспечивают безопасную передачу данных между потоками или процессами.Пример использования очереди:
    from queue import Queue
    from threading import Thread
    
    my_queue = Queue()
    
    def worker(queue):
        while True:
            data = queue.get()
            # Обработка данных
            print(f"Worker processed: {data}")
    
    # Запуск потока
    thread = Thread(target=worker, args=(my_queue,))
    thread.start()
    
    # Помещение данных в очередь
    my_queue.put("Hello, threading!")
    
  3. Избегайте использования общих изменяемых объектов: Если возможно, старайтесь избегать изменения общих изменяемых объектов, таких как списки и словари, внутри потоков или процессов. Вместо этого используйте неизменяемые объекты или более безопасные структуры данных.
  4. Используйте Thread Pool и Process Pool: Вместо создания большого количества потоков или процессов напрямую, рассмотрите использование пула потоков (ThreadPoolExecutor) или пула процессов (ProcessPoolExecutor) из модуля concurrent.futures. Это может помочь избежать избыточного создания и управления потоками/процессами.
  5. Изучите конкретные инструменты и библиотеки: Используйте высокоуровневые библиотеки и фреймворки, предоставляющие абстракции для параллельного программирования, такие как asyncio, concurrent.futures, и др. Эти инструменты облегчают работу с параллельностью и предоставляют безопасные абстракции для работы с общими ресурсами.
  6. Проводите тестирование: Тщательное тестирование вашего кода на наличие гонок данных может помочь выявить потенциальные проблемы. Используйте инструменты для анализа кода и проводите тесты в различных условиях.

Примеры проектных работ

  1. Проект с использованием асинхронности: Чат-приложение с реальным временем.
    • Описание: Разработка чат-приложения, в котором пользователи могут обмениваться сообщениями в реальном времени. Приложение должно поддерживать асинхронные операции, чтобы эффективно обрабатывать подключения и передачу сообщений без блокировки основного потока.
  2. Проект с использованием многопоточности: Обработка изображений в параллельных потоках.
    • Описание: Создание системы для обработки изображений, например, изменение размера, фильтрация и компрессия, с использованием многопоточности. Различные потоки могут обрабатывать разные изображения параллельно, ускоряя процесс обработки.
  3. Проект с использованием мультипроцессинга: Распределенные вычисления для научных расчетов.
    • Описание: Разработка приложения, которое выполняет сложные научные расчеты, например, моделирование физических процессов или анализ больших объемов данных. Пример: Распределенные вычисления для моделирования популяционной динамики в экосистеме. Мультипроцессинг может использоваться для распределенного выполнения вычислений на нескольких ядрах процессора, ускоряя процесс.
Понравилась статья? Поделиться с друзьями:
Школа Виктора Комлева
Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!:

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.