асинхронный проект. Что мне не хватает?

Я работаю над клиентом для этого чат-сервера, но столкнулся с проблемой. Сервер использует асинхронный модуль Python 3.4RC1.

Поведение:

Мой клиент подключается. Мой второй клиент подключается. Любой из них может отправлять сообщения на сервер, НО сервер не транслирует их, как это должно быть в обычном общедоступном чате.

Пользователь 1: Здравствуйте. Нажимает Enter.

Пользователь2 его не видит.

User2: Есть кто? Нажимает Enter.

User2 видит User1: Здравствуйте. и User2: Есть кто?

Просто... странно. Не уверен, что мне не хватает.

Вот файлы. Попробуйте.

Сервер:

from socket import socket, SO_REUSEADDR, SOL_SOCKET
from asyncio import Task, coroutine, get_event_loop

class Peer(object):
    def __init__(self, server, sock, name):
        self.loop = server.loop
        self.name = name
        self._sock = sock
        self._server = server
        Task(self._peer_handler())

    def send(self, data):
        return self.loop.sock_send(self._sock, data.encode('utf-8'))

    @coroutine
    def _peer_handler(self):
        try:
            yield from self._peer_loop()
        except IOError:
            pass
        finally:
            self._server.remove(self)

    @coroutine
    def _peer_loop(self):
        while True:
            buf = yield from self.loop.sock_recv(self._sock, 1024)
            if buf == b'':
                break
            self._server.broadcast('%s: %s' % (self.name, buf.decode('utf-8')))

class Server(object):
    def __init__(self, loop, port):
        self.loop = loop
        self._serv_sock = socket()
        self._serv_sock.setblocking(0)
        self._serv_sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        self._serv_sock.bind(('',port))
        self._serv_sock.listen(5)
        self._peers = []
        Task(self._server())

    def remove(self, peer):
        self._peers.remove(peer)
        self.broadcast('Peer %s quit!' % (peer.name,))

    def broadcast(self, message):
        for peer in self._peers:
            peer.send(message)

    @coroutine
    def _server(self):
        while True:
            peer_sock, peer_name = yield from self.loop.sock_accept(self._serv_sock)
            peer_sock.setblocking(0)
            peer = Peer(self, peer_sock, peer_name)
            self._peers.append(peer)
            self.broadcast('Peer %s connected!' % (peer.name,))

def main():
    loop = get_event_loop()
    Server(loop, 1234)
    loop.run_forever()

if __name__ == '__main__':
    main()

Клиент:

# import socket
from socket import *
# form socket import socket, bind, listen, recv, send

HOST = 'localhost' #localhost / 192.168.1.1
# LAN - 192.168.1.1
PORT = 1234
s = socket(AF_INET, SOCK_STREAM)# 98% of all socket programming will use AF_INET and SOCK_STREAM
s.connect((HOST, PORT))

while True:
    message = input("Your Message: ")
    encoded_msg = message.encode('utf-8')
    s.send(encoded_msg)
    print('Awaiting Reply..')
    reply = s.recv(1024)
    decoded_reply = reply.decode('utf-8')
    decoded_reply = repr(decoded_reply)
    print('Received ', decoded_reply)

s.close()

Вот код сервера без потоков, который я написал. отлично работает, но ТОЛЬКО между 2 людьми. Как можно обновить этот код, чтобы он транслировал каждое полученное сообщение всем подключенным клиентам?

# import socket
from socket import *
# form socket import socket, bind, listen, recv, send

HOST = 'localhost' #localhost / 192.168.1.1
# LAN - 192.168.1.1
PORT = 1234
s = socket(AF_INET, SOCK_STREAM) # 98% of all socket programming will use AF_INET and SOCK_STREAM
s.bind((HOST, PORT))
s.listen(5) # how many connections it can receive at one time
conn, addr = s.accept() # accept the connection
print('Connected by', addr) # print the address of the person connected

while True:
    data = conn.recv(1024)
    decoded_data = data.decode('utf-8')
    data = repr(decoded_data)
    print('Received ', decoded_data)
    reply = input("Reply: ")
    encoded_reply = reply.encode('utf-8')
    conn.sendall(encoded_reply)
    print('Server Started')
conn.close()

person suchislife    schedule 16.02.2014    source источник
comment
Получаю ошибку: AttributeError: '_WindowsSelectorEventLoop' object has no attribute 'sock_send', а также buf в _peer_loop не определено.   -  person poke    schedule 17.02.2014
comment
Опс! У меня было 3 строки кода, закомментированные в коде сервера. Скопируйте, вставьте, попробуйте еще раз.   -  person suchislife    schedule 17.02.2014
comment
AttributeError все еще сохраняется для меня.   -  person poke    schedule 17.02.2014
comment
У вас должен быть установлен последний Python. 3.4RC1   -  person suchislife    schedule 17.02.2014
comment
Чтобы запустить его, замените self.loop.sock_send на self.loop.sock_sendall. .   -  person poke    schedule 17.02.2014
comment
Черт побери... Круто. Попробуйте.   -  person suchislife    schedule 17.02.2014
comment
Попробуйте добавить print("sending message %s" % message) в начало метода broadcast(), и вы увидите, что сервер работает правильно. Проблема в том, что каждый клиент блокирует ожидание ввода данных пользователем, поэтому он не проверяет наличие новых сообщений чата до тех пор, пока после пользователь не введет что-либо. Использование потоков на клиенте является излишним и ненужным, если ваша цель — научиться использовать asyncio. См. stackoverflow.com/a/25352042/122763 пример чтения из стандартного ввода в цикле событий.   -  person Mark E. Haase    schedule 06.08.2015


Ответы (1)


Хорошо, давайте подумаем о том, что делает ваш клиент. Вы запрашиваете сообщение для отправки, блокируя ввод пользователя. Затем вы отправляете это сообщение и получаете все, что есть на сервере. После этого вы снова блокируетесь, ожидая другого сообщения.

Поэтому, когда клиент А отправляет текст, клиент Б, скорее всего, блокирует пользовательский ввод. Таким образом, B на самом деле не будет проверять, отправил ли сервер что-нибудь. Он будет отображать только то, что есть после отправки вами чего-либо.

Очевидно, что в чате вы не хотите блокировать пользовательский ввод. Вы хотите продолжать получать новые сообщения с сервера, даже если пользователь не отправляет сообщения. Поэтому вам нужно разделить их и запустить оба асинхронно.

Я еще не очень много сделал с asyncio, поэтому я действительно не знаю, можно ли это сделать с ним хорошо, но вам, по сути, просто нужно поместить чтение и отправку в две отдельные параллельные задачи, например. с помощью потоков или concurrent.futures.


Быстрый пример того, что вы можете сделать, используя threading:

from socket import *
from threading import Thread

HOST = 'localhost'
PORT = 1234
s = socket(AF_INET, SOCK_STREAM)
s.connect((HOST, PORT))

def keepReading ():
    try:
        while True:
            reply = s.recv(1024).decode()
            print('Received ', reply)
    except ConnectionAbortedError:
        pass

t = Thread(target=keepReading)
t.start()

try:
    while True:
        message = input('')
        s.send(message.encode())
except EOFError:
    pass
finally:
    s.close()
person poke    schedule 16.02.2014
comment
ааа... треды. У меня нет опыта работы с нитями. Но... Я могу опубликовать другой код сервера. Возможно, вы могли бы привести пример потока, основанный на нем? Обновление оригинального поста. Подожди. - person suchislife; 17.02.2014
comment
@Vini Ну, ты мог бы просто использовать concurrent.futures. Вот простой пример для ThreadPoolExecutor. В вашем случае у вас будет одна функция, которая будет иметь бесконечный цикл чтения из сокета, и другая функция с бесконечным циклом, ожидающим ввода пользователя и отправляющим его затем. - person poke; 17.02.2014
comment
Я... не могу так ясно представлять себе вещи. Но я продолжу читать. Не стесняйтесь бить e к нему. - person suchislife; 17.02.2014
comment
Хм... кажется выдает ошибку. ConnectionRefusedError: [WinError 10061] Не удалось установить соединение, поскольку целевая машина активно отказалась от него. - person suchislife; 17.02.2014
comment
У меня работает… вы уверены, что ваш сервер работает? Вы исправили часть sock_sendall в коде сервера? - person poke; 17.02.2014
comment
Вау, невероятно! Я думаю, это помогает, если я не думаю, что ваш код на самом деле был сервером... - person suchislife; 17.02.2014
comment
Просто последний быстрый вопрос. Я где-то читал, что порождение нескольких потоков может в конечном итоге привести к замедлению работы компьютера. Я читал что-то о переводе потока в спящий режим, пока он ждет, чтобы что-то сделать. Склонен ли этот код в конечном итоге к замедлению? - person suchislife; 17.02.2014
comment
И socket.recv, и input блокируют выполнение, поэтому в это время фактически не происходит никакой обработки. В любой момент операционная система может решить перепланировать потоки, чтобы приоритет отдавался тем, которые действительно нуждаются в вычислительной мощности. Так что нет, если вы не создадите большое количество потоков, это вообще не будет проблемой. - person poke; 17.02.2014
comment
Использование потоков противоречит цели написания этого с помощью asyncio. - person Mark E. Haase; 06.08.2015