Основной проблемой, которая мешает вам достичь желаемой производительности, является метод send_pkts()
. Он не просто отправляет пакет, он также создает пакет:
def send_pkts(ip):
#craft packet
while True:
#send packet
time.sleep(randint(0,3))
В то время как отправка пакета почти наверняка является задачей, связанной с вводом-выводом, создание пакета почти наверняка является задачей, связанной с процессором. Этот метод необходимо разделить на две задачи:
- создать пакет
- отправить пакет
Я написал базовый сервер сокетов и клиентское приложение, которое создает и отправляет пакеты на сервер. Идея состоит в том, чтобы иметь отдельный процесс, который обрабатывает пакеты и помещает их в очередь. Существует пул потоков, которые делят очередь с процессом создания пакетов. Эти потоки извлекают пакеты из очереди и отправляют их на сервер. Они также вставляют ответы сервера в другую общую очередь, но это только для моего собственного тестирования и не имеет отношения к тому, что вы пытаетесь сделать. Потоки завершаются, когда они получают None
(таблетку яда) из очереди. .
сервер.py:
import argparse
import socketserver
import time
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, help="bind to host")
parser.add_argument("--port", type=int, help="bind to port")
parser.add_argument("--packet-size", type=int, help="size of packets")
args = parser.parse_args()
HOST, PORT = args.host, args.port
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
time.sleep(1.5)
data = self.request.recv(args.packet_size)
self.request.sendall(data.upper())
with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
клиент.py:
import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread
def get_logger():
logger = logging.getLogger("threading_example")
logger.setLevel(logging.INFO)
fh = logging.FileHandler("client.log")
fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
class PacketMaker(mp.Process):
def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
mp.Process.__init__(self)
self.result_queue = result_queue
self.max_packets = max_packets
self.packet_size = packet_size
self.num_poison_pills = num_poison_pills
self.num_packets_made = 0
self.logger = logger
def run(self):
while True:
if self.num_packets_made >= self.max_packets:
for _ in range(self.num_poison_pills):
self.result_queue.put(None, timeout=1)
self.logger.debug('PacketMaker exiting')
return
self.result_queue.put(os.urandom(self.packet_size), timeout=1)
self.num_packets_made += 1
class PacketSender(Thread):
def __init__(self, task_queue, result_queue, addr, packet_size, logger):
Thread.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.server_addr = addr
self.packet_size = packet_size
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(addr)
self.logger = logger
def run(self):
while True:
packet = self.task_queue.get(timeout=1)
if packet is None:
self.logger.debug("PacketSender exiting")
return
try:
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
except socket.error:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.server_addr)
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
self.result_queue.put(response)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num-packets', type=int, help='number of packets to send')
parser.add_argument('--packet-size', type=int, help='packet size in bytes')
parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
parser.add_argument('--host', type=str, help='name of host packets will be sent to')
parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
args = parser.parse_args()
logger = get_logger()
logger.info(f"starting script with args {args}")
packets_to_send = mp.Queue(args.num_packets + args.num_threads)
packets_received = q.Queue(args.num_packets)
producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
for _ in range(args.num_threads)]
start_time = time.time()
logger.info("starting workers")
for worker in senders + producers:
worker.start()
for worker in senders:
worker.join()
logger.info("workers finished")
end_time = time.time()
print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")
запустить.ш:
#!/usr/bin/env bash
for i in "$@"
do
case $i in
-s=*|--packet-size=*)
packet_size="${i#*=}"
shift
;;
-n=*|--num-packets=*)
num_packets="${i#*=}"
shift
;;
-t=*|--num-threads=*)
num_threads="${i#*=}"
shift
;;
-h=*|--host=*)
host="${i#*=}"
shift
;;
-p=*|--port=*)
port="${i#*=}"
shift
;;
*)
;;
esac
done
python3 server.py --host="${host}" \
--port="${port}" \
--packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" \
--num-packets="${num_packets}" \
--num-threads="${num_threads}" \
--host="${host}" \
--port="${port}"
kill "${server_pid}"
$ ./run.sh -s=1024 -n=1500 -t=300 -h=localhost -p=9999
1500 пакетов получено за 4,70330023765564 секунды
$ ./run.sh -s=1024 -n=1500 -t=1500 -h=localhost -p=9999
1500 пакетов получено за 1,5025699138641357 секунд
Этот результат можно проверить, изменив уровень журнала в client.py на DEBUG
. Обратите внимание, что выполнение сценария занимает гораздо больше времени, чем 4,7 секунды. При использовании 300 потоков требуется довольно много разрывов, но журнал ясно показывает, что потоки завершают обработку за 4,7 секунды.
Принимайте все результаты производительности с недоверием. Я понятия не имею, на какой системе вы работаете. Я предоставлю свою соответствующую системную статистику: 2 Xeon X5550 @ 2,67 ГГц 24 МБ DDR3 @ 1333 МГц Debian 10 Python 3.7.3
Я решу проблемы с вашими попытками:
- Простой однопоточный: это почти гарантированно займет не менее 1,5 x num_packets секунд из-за задержки
randint(0, 3)
- Многопоточность: узким местом здесь, скорее всего, является GIL, но, скорее всего, это из-за части
craft packet
, а не send packet
.
- Многопроцессорность: для каждого процесса требуется как минимум файловый дескриптор, поэтому вы, вероятно, превышаете ограничение пользователя или системы, но это может сработать, если вы изменить соответствующие настройки
- Многопроцессорность + многопоточность: это не удается по той же причине, что и № 2, создание пакета, вероятно, связано с процессором.
Эмпирическое правило: привязка к вводу-выводу — использование потоков, привязка к процессору — использование процессов.
person
Michael Ruth
schedule
21.02.2021
select
для мультиплексирования сокетов? Тогда вам даже не понадобится нарезка. - person sethmlarson   schedule 24.10.2016