Производительность Python — лучший подход к параллелизму

Я реализую скрипт Python, который должен отправлять более 1500 пакетов параллельно менее чем за 5 секунд каждый.

Вкратце, что мне нужно:

def send_pkts(ip):
    #craft packet
    while True:
        #send packet
        time.sleep(randint(0,3))

for x in list[:1500]:
    send_pkts(x)
    time.sleep(randint(1,5))

Я пробовал простые однопоточные, многопоточные, многопроцессорные и многопроцессорные + многопоточные формы и столкнулся со следующими проблемами:

  1. Простой однопоточный: задержка «for», по-видимому, компрометирует зависимость «5 секунд».
  2. Многопоточность: я думаю, что не смог добиться того, чего хотел, из-за ограничений Python GIL.
  3. Многопроцессорность: это был лучший подход, который, казалось, работал. Однако из-за чрезмерного количества процессов виртуальная машина, на которой я запускаю скрипт, зависает (конечно, 1500 запущенных процессов). Таким образом становится непрактичным.
  4. Многопроцессорность + многопоточность: в этом подходе я создал меньше процессов, каждый из которых вызывает несколько потоков (допустим: 10 процессов, вызывающих по 150 потоков каждый). Было ясно, что виртуальная машина не зависает так быстро, как в подходе номер 3, однако максимальное количество «одновременных пакетов», которое я смог достичь, было ~800. Ограничения GIL? Ограничения виртуальной машины? В этой попытке я также пытался использовать пул процессов, но результаты были похожими.

Есть ли лучший подход, который я мог бы использовать для выполнения этой задачи?

[1] РЕДАКТИРОВАТЬ 1:

 def send_pkt(x):
     #craft pkt
     while True:
         #send pkt
         gevent.sleep(0)

 gevent.joinall([gevent.spawn(send_pkt, x) for x in list[:1500]])

[2] РЕДАКТИРОВАТЬ 2 (исправление обезьяны gevent):

from gevent import monkey; monkey.patch_all()

jobs = [gevent.spawn(send_pkt, x) for x in list[:1500]]
gevent.wait(jobs)
#for send_pkt(x) check [1]

Однако я получил следующую ошибку: "ValueError: filedescriptor out of range in select()". Поэтому я проверил свой системный ulimit (Soft и Hard оба максимальны: 65536). После этого я проверил, что это как-то связано с ограничениями select() в Linux (максимум 1024 fds). Пожалуйста, проверьте: http://man7.org/linux/man-pages/man2/select.2.html (раздел ОШИБКИ). Чтобы преодолеть это, я должен использовать poll() (http://man7.org/linux/man-pages/man2/poll.2.html). Но с poll() я возвращаюсь к тем же ограничениям: опрос — это «блокирующий подход».

С уважением,


person pascoal    schedule 24.10.2016    source источник
comment
Пробовали ли вы использовать модуль select для мультиплексирования сокетов? Тогда вам даже не понадобится нарезка.   -  person sethmlarson    schedule 24.10.2016
comment
Re: однако максимальная одновременная отправка пакетов, которую я мог достичь, составляла ~ 800, тогда что случилось?   -  person l'L'l    schedule 24.10.2016
comment
Без веских доказательств я не вижу реальных проблем у варианта 2 (многопоточность). Он может быть даже самым быстрым. Но все зависит от того, где на самом деле проводится большая часть времени.   -  person zvone    schedule 24.10.2016
comment
Вы пробовали гевент? У меня была аналогичная проблема, когда я моделировал множество параллельных телефонных звонков для провайдера телефонии. ZeroMQ + gevent дал мне безумную производительность по сравнению с любой другой установкой. Посмотрите, имеет ли это смысл для вас. Я думаю, что в вашем случае достаточно одного gevent.   -  person pragman    schedule 24.10.2016
comment
@l'L'l Так что это не сработало, потому что мне нужно отправить 1500 пакетов одновременно.   -  person pascoal    schedule 24.10.2016
comment
@Bitonator Я пробовал gevent со следующим: проверьте вопрос еще раз, пожалуйста. [1] Но это не сработало. На самом деле я мог просто выполнить ~ 100 пакетов одновременно.   -  person pascoal    schedule 24.10.2016
comment
@SethMichaelLarson Мне действительно нужен выбор? Моему сценарию не нужно было, например, держать пакет/соединение/сокет в ожидании ответа. Мне просто нужно отправить их в сеть менее чем за 5 секунд каждый.   -  person pascoal    schedule 25.10.2016
comment
@pascoal: просто чтобы проверить, исправляли ли вы сетевые библиотеки обезьян при использовании gevent?   -  person pragman    schedule 25.10.2016
comment
@Bitonator В первый раз: нет, не пробовал. Хотя я пробовал после и получил некоторые проблемы, пожалуйста, проверьте в [2]. Спасибо.   -  person pascoal    schedule 25.10.2016
comment
вы можете увеличить лимит fd - stackoverflow.com/questions/21515463/   -  person pragman    schedule 08.11.2016
comment
Это было первое, что я сделал, чтобы обойти эту ситуацию @Bitonator. Уже 65536. Это не работает. Спасибо.   -  person pascoal    schedule 05.01.2017
comment
Почему бы не попробовать с пулом. Вам нужно отправить 1500 пакетов за 5 секунд, это время учитывает крафт пакета? Какой размер ожидается от пакета? Способна ли ваша сеть справиться с такой нагрузкой?   -  person Adonis    schedule 03.08.2017
comment
Требуется дополнительная информация о ваших сокетах. Откуда должны исходить сообщения и куда они должны идти? Насколько велики сообщения? Я предполагаю, что они UDP?   -  person David Oldford    schedule 23.11.2020


Ответы (5)


При использовании параллелизма в Python хорошим подходом является использование ThreadPoolExecutor или ProcessPoolExecutor из https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures по моему опыту они хорошо работают.

пример threadedPoolExecutor, который можно адаптировать для вашего использования.

import concurrent.futures
import urllib.request
import time

IPs= ['168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204']

def send_pkt(x):
  status = 'Failed'
  while True:
    #send pkt
    time.sleep(10)
    status = 'Successful'
    break
  return status

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_ip = {executor.submit(send_pkt, ip): ip for ip in IPs}
    for future in concurrent.futures.as_completed(future_to_ip):
        ip = future_to_ip[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (ip, exc))
        else:
            print('%r send %s' % (url, data))
person Carl Kristensen    schedule 02.02.2021

Ваш результат в варианте 3: «из-за чрезмерного количества процессов виртуальная машина, на которой я запускаю скрипт, зависает (конечно, 1500 запущенных процессов)» может потребовать дальнейшего изучения. Я полагаю, что из собранной до сих пор информации может быть недостаточно определено, лучше ли это охарактеризовать как недостаток многопроцессорного подхода или как ограничение виртуальной машины.

Одним из довольно простых и прямолинейных подходов было бы проведение эксперимента по масштабированию: вместо того, чтобы все отправки происходили из отдельных процессов или из одного и того же, попробуйте промежуточные значения. Определите, сколько времени потребуется, чтобы разделить рабочую нагрузку пополам между двумя процессами или на 4, 8 и так далее.

При этом также может быть хорошей идеей запустить инструмент, такой как xperf в Windows или oprofile в Linux, чтобы записать, приводят ли эти различные варианты параллелизма к различным видам узких мест, например, перегрузка кеша ЦП, запуск виртуальной машины из память, или кто знает что еще. Самый простой способ сказать, это попробовать.

Основываясь на предыдущем опыте решения таких проблем и общих практических правилах, я ожидаю, что наилучшая производительность будет достигнута, когда количество многопроцессорных процессов меньше или равно количеству доступных ядер ЦП (либо на самой виртуальной машине, либо на гипервизор). Однако это предполагает, что проблема связана с процессором; возможно, производительность будет выше при большем количестве процессов, чем #cpu, если что-то блокируется во время отправки пакета, что позволит лучше использовать время ЦП, если оно чередуется с другими блокирующими операциями. Опять же, мы не знаем, пока не будут проведены некоторые эксперименты по профилированию и/или масштабированию.

person Aaron Altman    schedule 05.08.2019

Вы правы в том, что python является однопоточным, однако ваша желаемая задача (отправка сетевых пакетов) считается операцией, связанной с вводом-выводом, поэтому она является хорошим кандидатом для многопоточности. Ваш основной поток не занят, пока пакеты передаются, пока вы пишете свой код с учетом асинхронности.

Взгляните на документацию Python по асинхронной сети TCP — https://docs.python.org/3/library/asyncio-protocol.html#tcp-echo-client.

person Den1al    schedule 27.05.2020

Если узкое место основано на http (отправка пакетов), то GIL на самом деле не должен быть слишком большой проблемой.

Если в python также происходят вычисления, то GIL может помешать, и, как вы говорите, предпочтительнее параллелизм на основе процессов.

Вам не нужен один процесс на задачу! Кажется, это упущение в вашем мышлении. С классом Python Pool вы можете легко создать набор рабочих процессов, которые будут получать задачи из очереди.


import multiprocessing


def send_pkts(ip):
   ...


number_of_workers = 8

with multiprocessing.Pool(number_of_workers) as pool:
    pool.map(send_pkts, list[:1500])

Теперь вы запускаете number_of_workers + 1 процессов (рабочие процессы + исходный процесс), а N рабочих процессов одновременно запускают функцию send_pkts.

person D Hudson    schedule 10.02.2021

Основной проблемой, которая мешает вам достичь желаемой производительности, является метод send_pkts(). Он не просто отправляет пакет, он также создает пакет:

def send_pkts(ip):
#craft packet
while True:
    #send packet
    time.sleep(randint(0,3))

В то время как отправка пакета почти наверняка является задачей, связанной с вводом-выводом, создание пакета почти наверняка является задачей, связанной с процессором. Этот метод необходимо разделить на две задачи:

  1. создать пакет
  2. отправить пакет

Я написал базовый сервер сокетов и клиентское приложение, которое создает и отправляет пакеты на сервер. Идея состоит в том, чтобы иметь отдельный процесс, который обрабатывает пакеты и помещает их в очередь. Существует пул потоков, которые делят очередь с процессом создания пакетов. Эти потоки извлекают пакеты из очереди и отправляют их на сервер. Они также вставляют ответы сервера в другую общую очередь, но это только для моего собственного тестирования и не имеет отношения к тому, что вы пытаетесь сделать. Потоки завершаются, когда они получают None (таблетку яда) из очереди. .

сервер.py:

import argparse
import socketserver
import time


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--host", type=str, help="bind to host")
    parser.add_argument("--port", type=int, help="bind to port")
    parser.add_argument("--packet-size", type=int, help="size of packets")
    args = parser.parse_args()
    HOST, PORT = args.host, args.port

    class MyTCPHandler(socketserver.BaseRequestHandler):
        def handle(self):
            time.sleep(1.5)
            data = self.request.recv(args.packet_size)
            self.request.sendall(data.upper())

    with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
        server.serve_forever()

клиент.py:

import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread


def get_logger():
    logger = logging.getLogger("threading_example")
    logger.setLevel(logging.INFO)

    fh = logging.FileHandler("client.log")
    fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    return logger


class PacketMaker(mp.Process):
    def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
        mp.Process.__init__(self)
        self.result_queue = result_queue
        self.max_packets = max_packets
        self.packet_size = packet_size
        self.num_poison_pills = num_poison_pills
        self.num_packets_made = 0
        self.logger = logger

    def run(self):
        while True:
            if self.num_packets_made >= self.max_packets:
                for _ in range(self.num_poison_pills):
                    self.result_queue.put(None, timeout=1)
                self.logger.debug('PacketMaker exiting')
                return
            self.result_queue.put(os.urandom(self.packet_size), timeout=1)
            self.num_packets_made += 1


class PacketSender(Thread):
    def __init__(self, task_queue, result_queue, addr, packet_size, logger):
        Thread.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.server_addr = addr
        self.packet_size = packet_size
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect(addr)
        self.logger = logger

    def run(self):
        while True:
            packet = self.task_queue.get(timeout=1)
            if packet is None:
                self.logger.debug("PacketSender exiting")
                return
            try:
                self.sock.sendall(packet)
                response = self.sock.recv(self.packet_size)
            except socket.error:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect(self.server_addr)
                self.sock.sendall(packet)
                response = self.sock.recv(self.packet_size)
            self.result_queue.put(response)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--num-packets', type=int, help='number of packets to send')
    parser.add_argument('--packet-size', type=int, help='packet size in bytes')
    parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
    parser.add_argument('--host', type=str, help='name of host packets will be sent to')
    parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
    args = parser.parse_args()

    logger = get_logger()
    logger.info(f"starting script with args {args}")
    
    packets_to_send = mp.Queue(args.num_packets + args.num_threads)
    packets_received = q.Queue(args.num_packets)
    producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
    senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
               for _ in range(args.num_threads)]
    start_time = time.time()
    logger.info("starting workers")
    for worker in senders + producers:
        worker.start()
    for worker in senders:
        worker.join()
    logger.info("workers finished")
    end_time = time.time()
    print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")

запустить.ш:

#!/usr/bin/env bash

for i in "$@"
do
case $i in
    -s=*|--packet-size=*)
    packet_size="${i#*=}"
    shift 
    ;;
    -n=*|--num-packets=*)
    num_packets="${i#*=}"
    shift 
    ;;
    -t=*|--num-threads=*)
    num_threads="${i#*=}"
    shift 
    ;;
    -h=*|--host=*)
    host="${i#*=}"
    shift 
    ;;
    -p=*|--port=*)
    port="${i#*=}"
    shift 
    ;;
    *)
    ;;
esac
done

python3 server.py --host="${host}" \
                  --port="${port}" \
                  --packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" \
                  --num-packets="${num_packets}" \
                  --num-threads="${num_threads}" \
                  --host="${host}" \
                  --port="${port}"
kill "${server_pid}"

$ ./run.sh -s=1024 -n=1500 -t=300 -h=localhost -p=9999

1500 пакетов получено за 4,70330023765564 секунды

$ ./run.sh -s=1024 -n=1500 -t=1500 -h=localhost -p=9999

1500 пакетов получено за 1,5025699138641357 секунд

Этот результат можно проверить, изменив уровень журнала в client.py на DEBUG. Обратите внимание, что выполнение сценария занимает гораздо больше времени, чем 4,7 секунды. При использовании 300 потоков требуется довольно много разрывов, но журнал ясно показывает, что потоки завершают обработку за 4,7 секунды.

Принимайте все результаты производительности с недоверием. Я понятия не имею, на какой системе вы работаете. Я предоставлю свою соответствующую системную статистику: 2 Xeon X5550 @ 2,67 ГГц 24 МБ DDR3 @ 1333 МГц Debian 10 Python 3.7.3


Я решу проблемы с вашими попытками:

  1. Простой однопоточный: это почти гарантированно займет не менее 1,5 x num_packets секунд из-за задержки randint(0, 3)
  2. Многопоточность: узким местом здесь, скорее всего, является GIL, но, скорее всего, это из-за части craft packet, а не send packet.
  3. Многопроцессорность: для каждого процесса требуется как минимум файловый дескриптор, поэтому вы, вероятно, превышаете ограничение пользователя или системы, но это может сработать, если вы изменить соответствующие настройки
  4. Многопроцессорность + многопоточность: это не удается по той же причине, что и № 2, создание пакета, вероятно, связано с процессором.

Эмпирическое правило: привязка к вводу-выводу — использование потоков, привязка к процессору — использование процессов.

person Michael Ruth    schedule 21.02.2021