Параллельный веб-скрейпинг

Веб-скрейпинг — это быстрый процесс. По крайней мере, он обычно гораздо быстрее, чем найм дюжины стажёров для ручного копирования данных из интернета! Конечно, развитие технологий и непрерывное стремление к лучшему требуют, чтобы после определённого момента даже это уже не казалось «достаточно быстрым». Вот тогда люди обычно начинают обращать внимание на распределённые вычисления.

В отличие от большинства других областей информационных технологий, ускорить веб-скрейпинг простым увеличением вычислительной мощности часто не удаётся. Запуск одного процесса происходит быстро; запуск двух процессов не обязательно в два раза ускорит работу. Запуск трёх процессов может даже привести к блокировке со стороны удалённого сервера из-за слишком большого количества запросов!

Тем не менее, в некоторых ситуациях параллельный веб-скрейпинг, или одновременное выполнение нескольких потоков или процессов, может быть полезен:

  • Сбор данных с нескольких источников (с нескольких удалённых серверов), а не только с одного;
  • Выполнение длительных или сложных операций с собранными данными (например, анализ изображений или оптическое распознавание символов), которые могут быть выполнены параллельно с загрузкой данных;
  • Сбор данных с большого веб-сервиса, когда вы платите за каждый запрос, или когда создание нескольких соединений с сервисом разрешено в рамках вашего пользовательского соглашения.

Процессы и Потоки

Понятия потоков и процессов не являются специфичными исключительно для Python. Хотя конкретные детали реализации различаются в зависимости от операционной системы, в компьютерных науках общепринято, что процессы — это более крупные единицы, имеющие собственную память, в то время как потоки — это более мелкие единицы, которые делят память внутри процесса, который их содержит.

Обычно, когда вы запускаете простую программу на Python, она выполняется внутри своего собственного процесса, который содержит один поток. Но Python поддерживает как многопроцессорность (multiprocessing), так и многопоточность (multithreading). И многопроцессорность, и многопоточность преследуют одну и ту же конечную цель: выполнение двух задач программирования параллельно, вместо выполнения одной функции за другой традиционным линейным способом.

Однако вам нужно внимательно рассмотреть плюсы и минусы каждого подхода. Например, каждый процесс имеет свою собственную память, выделенную отдельно операционной системой. Это означает, что память не разделяется между процессами. В то время как несколько потоков могут без проблем записывать данные в одни и те же общие очереди, списки и другие объекты Python, процессы не могут этого делать и должны обмениваться информацией более явным способом.

Использование многопоточного программирования для выполнения задач в отдельных потоках с общей памятью часто считается проще, чем многопроцессное программирование. Но эта удобство имеет свою цену.

Глобальная блокировка интерпретатора Python (GIL) предотвращает одновременное выполнение одной и той же строки кода несколькими потоками. GIL гарантирует, что общая память, доступная всем процессам, не будет повреждена (например, байты в памяти могут быть наполовину записаны одним значением и наполовину другим). Эта блокировка позволяет писать многопоточные программы и знать, что вы получаете в каждой строке кода, но также может создавать узкие места в производительности.

Многопоточный Веб-скрейпинг

В следующем примере показано использование нескольких потоков для выполнения задачи:

import threading
import time

def print_time(threadName, delay, iterations):
    start = int(time.time())
    for i in range(iterations):
        time.sleep(delay)
        print(f'{int(time.time() - start)} - {threadName}')

threads = [
    threading.Thread(target=print_time, args=('Fizz', 3, 33)),
    threading.Thread(target=print_time, args=('Buzz', 5, 20)),
    threading.Thread(target=print_time, args=('Counter', 1, 100))
]

[t.start() for t in threads]
[t.join() for t in threads]

Этот скрипт является вариацией на тему классической задачи FizzBuzz, но выводит более подробную информацию:

1 Counter
2 Counter
3 Fizz
3 Counter
4 Counter
5 Buzz
5 Counter
6 Fizz
6 Counter
...

Сценарий запускает три потока: один печатает «Fizz» каждые три секунды, другой печатает «Buzz» каждые пять секунд, а третий печатает «Counter» каждую секунду.

Это пример демонстрирует, как можно использовать многопоточность в Python для одновременного выполнения нескольких задач. В данном случае, каждый поток запускает функцию print_time, которая выводит на экран время и имя потока с заданной задержкой и определенное количество раз.

Такой подход позволяет эффективно распределять задачи, особенно когда требуется обрабатывать данные с разных источников или выполнять различные операции, которые не зависят друг от друга. Это особенно актуально в веб-скрейпинге, где можно параллельно собирать данные с нескольких сайтов или API.

В следующем примере показано, как можно использовать многопоточность для скрейпинга веб-сайтов, например, для извлечения информации с разных страниц Википедии:

from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random
import threading
import time

visited = []  # Список посещенных статей

# Защита от повторного скрейпинга одной и той же страницы
def get_links(thread_name, bs):
    print(f'{thread_name}: Получение ссылок')
    links = bs.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link for link in links if link.attrs['href'] not in visited]

# Рекурсивный скрейпинг статей с Википедии с задержкой 5 секунд
def scrape_article(thread_name, path):
    global visited
    if path in visited:
        return
    visited.append(path)
    
    time.sleep(5)
    print(f'{thread_name}: Скрейпинг {path}')
    
    html = urlopen(f'http://en.wikipedia.org{path}')
    bs = BeautifulSoup(html, 'html.parser')
    
    title = bs.find('h1').get_text()
    print(f'{thread_name}: Текущая статья - "{title}"')
    
    links = get_links(thread_name, bs)
    if links:
        new_article = random.choice(links).attrs['href']
        scrape_article(thread_name, new_article)

# Создание потоков для скрейпинга разных статей
threads = [
    threading.Thread(target=scrape_article, args=('Поток 1', '/wiki/Kevin_Bacon',)),
    threading.Thread(target=scrape_article, args=('Поток 2', '/wiki/Monty_Python',)),
]

# Запуск потоков
[t.start() for t in threads]
[t.join() for t in threads]

Этот скрипт запускает два потока: один для скрейпинга статей, начиная со страницы Кевина Бэйкона, и другой — со страницы о Монти Пайтон. Каждый поток выводит название текущей статьи и ждёт 5 секунд перед переходом к случайной связанной статье.

Обратите внимание на следующие моменты:

  1. Задержка в 5 секунд (time.sleep(5)) используется для предотвращения чрезмерной нагрузки на серверы Википедии. В реальных задачах эту задержку следует корректировать в соответствии с правилами использования целевого сервера или убирать, если количество запросов не является проблемой.
  2. Проверка на посещение статей (if path in visited:): Эта логика помогает избежать повторного скрейпинга одной и той же страницы в пределах одного потока. Однако из-за многопоточности может возникнуть ситуация, когда разные потоки одновременно обнаруживают и добавляют одну и ту же страницу в список visited. Это пример состояния гонки.
  3. Состояние гонки: В примере использован простой список visited, который может не полностью предотвратить повторение статей между потоками из-за асинхронного доступа. Для более строгого контроля можно использовать потокобезопасные структуры данных или механизмы блокировки (threading.Lock()), чтобы управлять доступом к общим ресурсам.

Этот пример показывает базовый подход к многопоточному веб-скрейпингу, который может быть адаптирован и расширен в зависимости от конкретных требований и условий задачи.

Состояния гонки и очереди

Хотя для общения между потоками можно использовать списки, они не предназначены для этого, и их неправильное использование может легко привести к замедлению выполнения программы или даже к ошибкам, вызванным состояниями гонки.

Списки отлично подходят для добавления элементов или их чтения, но они не так удобны для удаления элементов в произвольных местах, особенно с начала списка. Использование такой строки:

myList.pop(0)

фактически требует от Python переписать весь список, что замедляет выполнение программы.

Еще более опасен тот факт, что списки также могут привести к написанию кода, который не является потокобезопасным. Например:

myList[len(myList)-1]

может не дать вам последний элемент списка в многопоточной среде, или даже может вызвать исключение, если значение len(myList)-1 вычисляется непосредственно перед тем, как другая операция изменяет список.

Кто-то может возразить, что предыдущее выражение можно написать более «питонически» как:

myList[-1]

но даже если ваш код безупречен, рассмотрите другие формы непотокобезопасных строк кода, связанных со списками:

my_list[i] = my_list[i] + 1
my_list.append(my_list[-1])

Оба эти действия могут привести к состоянию гонки за ресурсы, которое может вызвать неожиданные результаты. Вам может быть интересно попробовать другой подход и использовать другие типы переменных помимо списков. Например:

# Чтение сообщения из глобального списка
my_message = global_message
# Запись сообщения обратно
global_message = "I've retrieved the message"
# делаем что-то с my_message

Код кажется отличным решением, пока вы не осознаете, что могли случайно перезаписать другое сообщение, приходящее из другого потока, в момент между первой и второй строками, с текстом «I’ve retrieved the message».

Так что теперь вам просто нужно построить сложную систему личных объектов сообщений для каждого потока с некоторой логикой, чтобы понять, кто что получает… или вы можете использовать модуль Queue, созданный именно для этой цели.

Использование Очереди

Модуль Queue в Python предоставляет потокобезопасные очереди, которые являются идеальными для передачи сообщений между потоками. Вот как вы можете использовать Queue для безопасного обмена данными между потоками:

import threading
import queue
import time

# Функция, которая будет выполняться в потоках
def worker(thread_name, q):
    while not q.empty():
        item = q.get()
        print(f'{thread_name} обрабатывает элемент {item}')
        time.sleep(1)  # Имитация затратного процесса
        q.task_done()  # Сигнализируем, что задача обработана

# Создаем очередь и добавляем в нее элементы
q = queue.Queue()
for item in range(10):
    q.put(item)

# Создаем и запускаем потоки
threads = []
for i in range(2):  # 2 потока для обработки
    t = threading.Thread(target=worker, args=(f'Поток {i+1}', q))
    t.start()
    threads.append(t)

# Ожидаем завершения работы всех потоков
for t in threads:
    t.join()

print("Все элементы обработаны.")

Потокобезопасность с Queue

Очередь Queue обеспечивает потокобезопасное добавление и удаление элементов, что делает её идеальной для использования в многопоточных программах, где требуется передавать данные между потоками. В отличие от списка, операции с Queue не требуют дополнительных блокировок или других механизмов синхронизации, так как все операции с очередью уже потокобезопасны.

Использование Queue помогает предотвратить многие типичные ошибки, связанные с состояниями гонки, и делает код более чистым, понятным и надежным для многопоточного выполнения.

Очереди представляют собой объекты, похожие на списки, которые работают по принципу FIFO (First In, First Out — первым пришёл, первым ушёл) или LIFO (Last In, First Out — последним пришёл, первым ушёл). Очередь может получать сообщения от любого потока через queue.put('My message') и передавать сообщение любому потоку, который вызывает queue.get().

Очереди не предназначены для хранения статических данных, а для их передачи безопасным для потоков способом. После извлечения данных из очереди они должны существовать только в потоке, который их извлек. По этой причине очереди часто используются для делегирования задач или отправки временных уведомлений.

Это может быть полезно при веб-скрейпинге. Например, предположим, что вы хотите сохранить данные, собранные вашим скрейпером, в базу данных, и вы хотите, чтобы каждый поток мог быстро сохранять свои данные. Одно общее соединение для всех потоков может вызвать проблемы (одно соединение не может обрабатывать запросы параллельно), но нет смысла предоставлять каждому потоку скрейпинга своё собственное соединение с базой данных. По мере роста вашего скрейпера (возможно, вы будете собирать данные с сотен различных веб-сайтов в сотнях различных потоков), это может привести к большому количеству в основном простаивающих соединений с базой данных, которые выполняют только случайную запись после загрузки страницы.

Вместо этого вы можете иметь меньшее количество потоков базы данных, каждый с собственным соединением, которые ожидают элементы из очереди и сохраняют их. Это обеспечивает гораздо более управляемый набор соединений с базой данных:

import threading
from queue import Queue
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random
import time
import pymysql

visited = set()

def get_links(thread_name, bs):
    print(f'Получение ссылок в {thread_name}')
    links = bs.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link['href'] for link in links if link['href'] not in visited]

def scrape_article(thread_name, path, queue):
    global visited
    time.sleep(5)
    visited.add(path)
    print(f'{thread_name}: Скрейпинг {path}')
    html = urlopen(f'http://en.wikipedia.org{path}')
    bs = BeautifulSoup(html, 'html.parser')
    links = get_links(thread_name, bs)
    if links:
        for link in links:
            queue.put(link)
        new_article = links[random.randint(0, len(links)-1)]
        scrape_article(thread_name, new_article, queue)

def storage(queue):
    conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock', user='root', passwd='password', db='mysql', charset='utf8')
    cur = conn.cursor()
    cur.execute('USE wikipedia')
    while True:
        if not queue.empty():
            path = queue.get()
            cur.execute('SELECT * FROM pages WHERE url = %s', (path,))
            if cur.rowcount == 0:
                print(f'Сохранение статьи {path}')
                cur.execute('INSERT INTO pages (url) VALUES (%s)', (path,))
                conn.commit()
            else:
                print(f'Статья уже существует: {path}')
        else:
            time.sleep(1)  # Даем время на добавление новых элементов в очередь

queue = Queue()

threads = [
    threading.Thread(target=scrape_article, args=('Поток 1', '/wiki/Kevin_Bacon', queue)),
    threading.Thread(target=scrape_article, args=('Поток 2', '/wiki/Monty_Python', queue)),
    threading.Thread(target=storage, args=(queue,))
]

for t in threads:
    t.start()
    
for t in threads:
    t.join()

Анализ скрипта

Этот скрипт создает три потока: два для скрейпинга страниц с Википедии случайным образом и третий для сохранения собранных данных в базу данных MySQL.

Особенности реализации:

  1. Многопоточность и очереди: Скрейперы (scrape_article) добавляют URL-адреса в очередь, а поток storage извлекает их из очереди и сохраняет в базу данных.
  2. Обработка URL: Вместо работы с названием и URL страницы скрипт работает только с URL. Это упрощает код и уменьшает количество обрабатываемых данных.
  3. Потокобезопасные множества: Вместо списка использовалось множество visited для хранения посещенных URL. Хотя это не строго потокобезопасно, избыточность устроена так, что любые дубликаты не повлияют на конечный результат.
  4. Управление соединениями с базой данных: Вместо множества соединений используется несколько потоков базы данных, каждый с собственным соединением, что обеспечивает более управляемый набор соединений.

Этот подход позволяет эффективно распределять задачи между потоками и управлять ресурсами, что особенно важно при масштабировании веб-скрейпинга.

Расширенные возможности модуля threading

Модуль threading в Python является высокоуровневым интерфейсом, построенным на более низкоуровневом модуле _thread. Хотя _thread вполне пригоден для использования сам по себе, он требует немного больше усилий и не предоставляет мелочи, которые делают жизнь более приятной — например, удобные функции и интересные возможности.

Например, вы можете использовать статические функции, такие как enumerate, чтобы получить список всех активных потоков, инициализированных через модуль threading, без необходимости отслеживать их самостоятельно. Функция activeCount аналогично предоставляет общее количество потоков. Многие функции из _thread получают более удобные или запоминающиеся имена, например, currentThread вместо get_ident для получения имени текущего потока.

Одна из приятных особенностей модуля threading — это простота создания локальных данных потока, которые недоступны другим потокам. Это может быть полезной функцией, если у вас есть несколько потоков, каждый из которых скрейпит разные веб-сайты и каждый ведет свой собственный локальный список посещенных страниц.

Эти локальные данные могут быть созданы в любой точке функции потока, вызвав threading.local():

import threading

def crawler(url):
    data = threading.local()
    data.visited = []
    # Crawl site

threading.Thread(target=crawler, args=('http://brookings.edu',)).start()

Этот подход решает проблему состояний гонки, возникающих между общими объектами в потоках. Когда объект не нужно делить, его не следует делить, и он должен храниться в локальной памяти потока. Для безопасного обмена объектами между потоками по-прежнему может использоваться Queue из предыдущего раздела.

Модуль threading действует как няня для потоков и может быть высоко настроен для определения того, что эта няня включает. Функция isAlive по умолчанию проверяет, активен ли поток. Она будет истинной, пока поток не завершит скрейпинг (или не упадет).

Часто скрейперы разрабатываются для работы в течение очень долгого времени. Метод isAlive может гарантировать, что если поток выйдет из строя, он перезапустится:

t = threading.Thread(target=crawler, args=('http://brookings.edu',))
t.start()
while True:
    time.sleep(1)
    if not t.isAlive():
        t = threading.Thread(target=crawler, args=('http://brookings.edu',))
        t.start()

Другие методы мониторинга могут быть добавлены путем расширения объекта threading.Thread:

import threading
import time

class Crawler(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.done = False

    def isDone(self):
        return self.done

    def run(self):
        time.sleep(5)
        self.done = True
        raise Exception('Something bad happened!')

t = Crawler()
t.start()
while True:
    time.sleep(1)
    if t.isDone():
        print('Done')
        break
    if not t.isAlive():
        t = Crawler()
        t.start()

Этот новый класс Crawler содержит метод isDone, который можно использовать для проверки, завершил ли скрейпер работу. Это может быть полезно, если есть дополнительные методы логирования, которые нужно завершить, так что поток не может закрыться, но основная часть работы по скрейпингу выполнена. В общем, isDone может быть заменен некоторым видом статуса или меры прогресса — сколько страниц залогировано или текущая страница, например.

Любые исключения, вызванные Crawler.run, приведут к перезапуску класса, пока isDone не станет True и программа не завершит работу.

Расширение threading.Thread в классах вашего скрейпера может улучшить их надежность и гибкость, а также вашу способность мониторить любое свойство многих скрейперов одновременно.

Многопроцессорность в Python

Модуль multiprocessing в Python создает объекты новых процессов, которые могут быть запущены и присоединены к главному процессу. В следующем коде используется пример FizzBuzz из раздела о многопоточности, чтобы продемонстрировать работу с многопроцессорностью:

from multiprocessing import Process
import time

def print_time(threadName, delay, iterations):
    start = int(time.time())
    for i in range(iterations):
        time.sleep(delay)
        seconds_elapsed = str(int(time.time()) - start)
        print(threadName if threadName else seconds_elapsed)

processes = [
    Process(target=print_time, args=('Counter', 1, 100)),
    Process(target=print_time, args=('Fizz', 3, 33)),
    Process(target=print_time, args=('Buzz', 5, 20))
]

[p.start() for p in processes]
[p.join() for p in processes]

Каждый процесс рассматривается операционной системой как отдельная независимая программа. Если вы просмотрите свои процессы через монитор активности или диспетчер задач ОС, вы должны увидеть это отражение, как показано на рисунке в вашем источнике.

Активные процессы

Понимание PID

Если у вас нет другой программы, которая быстро выделяет PID, пока скрипт FizzBuzz работает, вы должны увидеть несколько PID — в данном случае 71180 и 78412.

Эти PID также можно найти в коде с помощью модуля os:

import os

# Выводит PID дочернего процесса
print(os.getpid())

# Выводит PID родительского процесса
print(os.getppid())

Каждый процесс в вашей программе должен напечатать разный PID для строки os.getpid(), но напечатать один и тот же родительский PID на os.getppid().

Завершение Процессов

Технически, пара строк кода не нужна для этой конкретной программы. Если конечное выражение join не включено:

[p.join() for p in processes]

родительский процесс все равно завершится и автоматически завершит дочерние процессы. Однако это объединение необходимо, если вы хотите выполнить какой-либо код после завершения этих дочерних процессов.

[p.start() for p in processes]
print('Программа завершена')

Если выражение join не включено, вывод будет следующим:

Программа завершена
1
2

Если выражение join включено, программа дождется завершения каждого процесса, прежде чем продолжить:

[p.start() for p in processes]
[p.join() for p in processes]
print('Программа завершена')
...
Fizz
99
Buzz
100
Программа завершена

Если вы хотите преждевременно остановить выполнение программы, вы можете использовать Ctrl-C, чтобы завершить родительский процесс. Завершение родительского процесса также приведет к завершению любых порожденных дочерних процессов, так что использование Ctrl-C безопасно и не оставит процессы, работающие в фоновом режиме.

Преимущества multiprocessing перед threading

  • Отдельное пространство памяти: Каждый процесс имеет свое пространство памяти, поэтому переменные процесса копируются между всеми процессами. Это увеличивает память, используемую программой, но делает обмен данными между процессами более безопасным и простым.
  • Обход GILmultiprocessing обходит Global Interpreter Lock (GIL) в Python, позволяя полностью использовать многопроцессорные системы.
  • Подходит для задач, требующих интенсивных вычислений: Если ваша программа в основном выполняет вычисления в памяти, multiprocessing будет использовать несколько ядер и обеспечит значительное ускорение.

Недостатки multiprocessing

  • Больше ресурсов: Создание нового процесса требует больше ресурсов, чем создание нового потока. По этой причине multiprocessing может быть неэффективным для большого количества коротких задач.
  • Сложность взаимодействия: Взаимодействие между процессами сложнее, чем между потоками, из-за разделения памяти. Обмен данными обычно осуществляется с использованием Queue или Pipe.

Многопроцессорный Веб-скрейпинг

Пример многопоточного скрейпинга Википедии можно изменить, используя отдельные процессы вместо отдельных потоков. В этом случае каждый процесс будет работать независимо, и вы сможете использовать преимущества многопроцессорности для ускорения работы скрейпера.

Вот как можно модифицировать скрейпинг Википедии, используя процессы:

from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random
from multiprocessing import Process, Manager
import os
import time

def get_links(bs):
    links = bs.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link for link in links if link.attrs['href'] not in visited]

def scrape_article(path, visited):
    visited.append(path)
    html = urlopen(f'http://en.wikipedia.org{path}')
    time.sleep(5)  # Искусственная задержка
    bs = BeautifulSoup(html, 'html.parser')
    print(f'Scraping {bs.find("h1").get_text()} in process {os.getpid()}')
    links = get_links(bs)
    if links:
        new_link = links[random.randint(0, len(links) - 1)].attrs['href']
        scrape_article(new_link, visited)

if __name__ == "__main__":
    with Manager() as manager:
        visited = manager.list()  # Создаем список в менеджере, чтобы он был общим для процессов

        processes = [
            Process(target=scrape_article, args=('/wiki/Kevin_Bacon', visited)),
            Process(target=scrape_article, args=('/wiki/Monty_Python', visited))
        ]

        # Запуск процессов
        for p in processes:
            p.start()

        # Ожидание завершения всех процессов
        for p in processes:
            p.join()

Особенности реализации

  1. Использование Manager из multiprocessing:
    • В этом примере используется Manager для создания списка visited, который является общим для всех процессов. Это позволяет всем процессам видеть и обновлять общий список посещенных страниц, что помогает избежать повторного посещения страниц разными процессами.
  2. Задержка в работе скрейпера:
    • Добавлена искусственная задержка time.sleep(5), чтобы уменьшить нагрузку на серверы Википедии и сделать пример более наглядным.
  3. Использование os.getpid():
    • Используется для вывода идентификатора процесса, который выполняет скрейпинг, что помогает отслеживать, какой процесс работает с какой страницей.

Преимущества и недостатки

Преимущества

  • Отсутствие GIL: Процессы не подвержены блокировке GIL, что позволяет им одновременно выполнять одни и те же строки кода и изменять одни и те же объекты (точнее, отдельные экземпляры одного и того же объекта).
  • Использование многих CPU ядер: Процессы могут работать на нескольких ядрах CPU, что может дать преимущества в скорости, если каждый процесс или поток интенсивно использует процессор.

Недостатки

  • Независимые списки посещенных страниц: В предыдущей программе все найденные URL-адреса хранились в глобальном списке visited. Когда использовались многопоточность, этот список был общим для всех потоков, и один поток не мог посетить страницу, которую уже посетил другой поток. Однако теперь каждый процесс получает свою собственную независимую версию списка visited и может свободно посещать страницы, которые уже были посещены другими процессами. Использование Manager().list() помогает решить эту проблему, синхронизируя список посещенных страниц между процессами.
  • Больше ресурсов: Создание нового процесса требует больше ресурсов, чем создание потока, что может быть неэффективным для большого количества коротких задач.

Итог

Многопроцессный подход может быть очень эффективен для масштабируемых задач веб-скрейпинга, особенно когда задачи интенсивно используют CPU и могут быть распределены по нескольким процессорам. Однако требуется тщательное управление общими ресурсами и обработка данных между процессами, чтобы избежать повторной обработки и максимально использовать преимущества распараллеливания.

Взаимодействие между процессами

Взаимодействие между процессами в Python можно организовать с помощью объектов, таких как очереди (Queue) и каналы (Pipe). Эти объекты позволяют процессам обмениваться информацией, несмотря на то, что каждый процесс работает в своей собственной независимой области памяти.

Использование очередей для взаимодействия между процессами

Очередь (Queue) похожа на очередь, используемую с потоками, но она предназначена для процессов. Информация может быть помещена в очередь одним процессом и извлечена другим процессом. После того как информация извлечена, она исчезает из очереди.

В модифицированном примере скрейпинга Википедии можно использовать очереди для организации распределенного скрейпинга, где процессы сотрудничают, выполняя полный обход сайта. Каждый процесс может взять любую «задачу» из очереди, а не только те ссылки, которые он сам нашел.

Пример: Многопроцессный веб-скрейпинг с очередями

from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import random
from multiprocessing import Process, Queue, Manager
import os
import time

def get_links(bs):
    links = bs.find('div', {'id':'bodyContent'}).find_all('a', href=re.compile('^(/wiki/)((?!:).)*$'))
    return [link.attrs['href'] for link in links]

def scrape_article(taskQueue, urlsQueue):
    while True:
        while taskQueue.empty():
            # Ожидание появления задач в очереди
            time.sleep(0.1)

        path = taskQueue.get()
        html = urlopen(f'http://en.wikipedia.org{path}')
        time.sleep(5)  # Искусственная задержка
        bs = BeautifulSoup(html, 'html.parser')
        title = bs.find('h1').get_text()
        print(f'Scraping {title} in process {os.getpid()}')

        links = get_links(bs)
        # Отправка найденных ссылок обратно в очередь URL-адресов для обработки
        urlsQueue.put(links)

def task_delegator(taskQueue, urlsQueue):
    visited = ['/wiki/Kevin_Bacon', '/wiki/Monty_Python']
    taskQueue.put('/wiki/Kevin_Bacon')
    taskQueue.put('/wiki/Monty_Python')

    while True:
        if not urlsQueue.empty():
            links = [link for link in urlsQueue.get() if link not in visited]
            for link in links:
                visited.append(link)
                taskQueue.put(link)

if __name__ == "__main__":
    manager = Manager()
    
    taskQueue = manager.Queue()
    urlsQueue = manager.Queue()

    processes = [
        Process(target=task_delegator, args=(taskQueue, urlsQueue)),
        Process(target=scrape_article, args=(taskQueue, urlsQueue)),
        Process(target=scrape_article, args=(taskQueue, urlsQueue))
    ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

Особенности и преимущества данного подхода

  1. Делегирование задач (task_delegator):
    • Этот процесс инициализирует начальные задачи и затем слушает очередь URL-адресов (urlsQueue), добавляя новые задачи (ссылки), которые еще не были посещены. Это позволяет динамически распределять задачи между скрейперами.
  2. Скрейпинг статей (scrape_article):
    • Процессы, выполняющие скрейпинг, берут задачи из taskQueue, обрабатывают их и помещают результаты в urlsQueue. Это обеспечивает непрерывное взаимодействие и обмен данными между процессами.
  3. Избегание повторного посещения:
    • Проверка if link not in visited в task_delegator гарантирует, что каждая страница посещается только один раз, предотвращая бесконечные циклы и ненужную перезагрузку страниц.
  4. Распределенное выполнение:
    • Процессы могут работать параллельно на множественных ядрах CPU, что потенциально ускоряет скрейпинг за счет параллельной обработки задач.
  5. Робастность и масштабируемость:
    • Система легко масштабируется путем добавления большего количества процессов скрейпинга, и она устойчива к временным ошибкам, так как процессы независимы и не блокируются при ошибках в одном из них.

Недостатки

  • Сложность управления памятью и состоянием:
    • Взаимодействие между процессами через очереди и менеджеры требует внимательного управления состоянием и может быть источником ошибок при неправильной синхронизации.
  • Накладные расходы на управление процессами:
    • Запуск и управление процессами требует больше системных ресурсов по сравнению с потоками, что может быть неоптимальным для легковесных или коротких задач.

Заключение

Многопроцессный подход в веб-скрейпинге позволяет эффективно использовать многопроцессорность и многозадачность для ускорения обработки данных. Он особенно полезен для интенсивных и долгоживущих задач скрейпинга, но требует тщательного проектирования для корректного взаимодействия и управления состоянием разделяемых данных.

Многопроцессный Веб-скрейпинг — Альтернативный Подход

Во всех обсуждавшихся подходах к многопоточному и многопроцессному скрейпингу предполагается некоторый уровень «родительского контроля» над дочерними потоками и процессами. Вы можете запустить их все одновременно, завершить их все одновременно и обмениваться сообщениями или разделять память между ними.

Но что, если ваш скрейпер разработан таким образом, что не требуется никакого руководства или коммуникации? В этом случае может быть мало причин для использования _thread или сложной многопроцессной архитектуры.

Пример: Параллельный скрейпинг двух веб-сайтов

Допустим, вы хотите параллельно скрейпить два похожих веб-сайта. У вас есть скрейпер, который может скрейпить любой из этих сайтов, что определяется небольшим изменением конфигурации или, возможно, аргументом командной строки. В этом случае вы можете просто выполнить следующее:

python my_crawler.py website1
python my_crawler.py website2

И вот, вы только что запустили многопроцессный веб-скрейпер, при этом сэкономили ресурсы CPU, не держа родительский процесс!

Недостатки этого подхода

  1. Координация между процессами: Если вы хотите запустить два веб-скрейпера на одном и том же сайте таким образом, вам нужно как-то гарантировать, что они не начнут скрейпить одни и те же страницы случайно. Решение может заключаться в создании правила URL (например, «скрейпер 1 сканирует страницы блога, скрейпер 2 сканирует страницы продуктов») или разделении сайта каким-то образом.
  2. Синхронизация через базу данных: В качестве альтернативы, вы можете управлять этой координацией через некоторую промежуточную базу данных, например, Redis. Перед переходом к новой ссылке скрейпер может сделать запрос к базе данных с вопросом: «Была ли эта страница скрейплена?» Скрейпер использует базу данных как систему межпроцессного взаимодействия. Однако без тщательного рассмотрения этот метод может привести к состояниям гонки или задержкам, если соединение с базой данных медленное (что вероятно, только если подключение к удаленной базе данных).
  3. Масштабируемость: Этот метод может быть не так масштабируем, как использование модуля Process, который позволяет динамически увеличивать или уменьшать количество процессов, скрейпящих сайт или сохраняющих данные. Запуск скрейперов вручную требует либо физического запуска скрипта человеком, либо отдельного управляющего скрипта (будь то bash-скрипт, задача cron или что-то еще).

Пример использования

Предположим, у вас есть скрипт my_crawler.py, который принимает URL в качестве аргумента командной строки и скрейпит сайт. Вы можете запустить несколько экземпляров этого скрипта параллельно для разных частей сайта или разных сайтов:

python my_crawler.py "http://example.com/section1"
python my_crawler.py "http://example.com/section2"

Каждый процесс будет работать независимо, и если скрейперы не должны взаимодействовать, это может быть очень эффективным способом ускорить общий процесс скрейпинга.

Использование Redis для координации

Если вы решите использовать Redis для координации скрейперов, вот примерный подход:

import redis
import hashlib

def check_and_add_url(url):
    hash_url = hashlib.md5(url.encode('utf-8')).hexdigest()
    if not r.sismember("visited", hash_url):
        r.sadd("visited", hash_url)
        return True
    return False

r = redis.Redis()

url = "http://example.com/somepage"
if check_and_add_url(url):
    # Продолжаем скрейпинг
    pass

Индивидуальное и групповое обучение «Аналитик данных»
Если вы хотите научиться стать экспертом в аналитике, могу помочь. Запишитесь на мой курс «Аналитик данных» и начните свой путь в мир ИТ уже сегодня!

Контакты
Для получения дополнительной информации и записи на курсы свяжитесь со мной:

Телеграм: https://t.me/Vvkomlev
Email: victor.komlev@mail.ru

Объясняю сложное простыми словами. Даже если вы никогда не работали с ИТ и далеки от программирования, теперь у вас точно все получится! Проверено десятками примеров моих учеников.

Гибкий график обучения. Я предлагаю занятия в мини-группах и индивидуально, что позволяет каждому заниматься в удобном темпе. Вы можете совмещать обучение с работой или учебой.

Практическая направленность. 80%: практики, 20% теории. У меня множество авторских заданий, которые фокусируются на практике. Вы не просто изучаете теорию, а сразу применяете знания в реальных проектах и задачах.

Разнообразие учебных материалов: Теория представлена в виде текстовых уроков с примерами и видео, что делает обучение максимально эффективным и удобным.

Понимаю, что обучение информационным технологиям может быть сложным, особенно для новичков. Моя цель – сделать этот процесс максимально простым и увлекательным. У меня персонализированный подход к каждому ученику. Максимальный фокус внимания на ваши потребности и уровень подготовки.

Понравилась статья? Поделиться с друзьями:
Школа Виктора Комлева
Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!:

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.