Клиент Autobahn websocket в приложении Quart (async Flask)

Добрый вечер всем. Я не новичок в этом месте, но наконец решил зарегистрироваться и попросить о помощи. Я разрабатываю веб-приложение с использованием фреймворка Quart (асинхронный Flask). И теперь, когда приложение стало больше и сложнее, я решил разделить разные процедуры для разных экземпляров сервера, в основном потому, что я хочу, чтобы веб-сервер был чистым, более абстрактным и свободным от вычислительной нагрузки.
Итак, я планирую использовать один веб-сервер сервер с несколькими (при необходимости) идентичными процедурными серверами. Все серверы построены на фреймворке Quart, пока просто для простоты разработки. Я решил использовать роутер Crossbar.io и автобан, чтобы соединить все серверы вместе.

И вот тут возникла проблема. Я следил за этими сообщениями:

Как я могу реализовать интерактивный клиент веб-сокета с автобаном asyncio?

Как я могу интегрировать кроссбарный клиент (python3, asyncio) с tkinter

Как отправить сообщение Autobahn / Twisted WAMP извне протокола?

Похоже, я перепробовал все возможные подходы для реализации клиента autobahn websocket в своем приложении Quart. Я не знаю, как сделать это возможным, чтобы обе вещи работали, работает ли приложение Quart, а клиент autobahn WS - нет, или наоборот.

Упрощенное приложение для моей кварты выглядит так:

from quart import Quart, request, current_app
from config import Config
# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

import concurrent.futures

class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    print ("before autobahn start")
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        runner = ApplicationRunner('ws://127.0.0.1:8080 /ws', 'realm1')
        future = executor.submit(runner.run(Component))
    print ("after autobahn started")

    return app


from app import models

В этом случае приложение застряло в цикле бегунов и все приложение не работает (не может обслуживать запросы), это становится возможным, только если я прерву цикл бегунов (автобан) нажатием Ctrl-C.

CMD после запуска:

(quart-app) user@car:~/quart-app$ hypercorn --debug --error-log - --access-log - -b 0.0.0.0:8001 tengine:app
Running on 0.0.0.0:8001 over http (CTRL + C to quit)
before autobahn start
Ok, registered procedure with registration ID 4605315769796303

после нажатия ctrl-C:

...
^Cafter autobahn started
2019-03-29T01:06:52 <Server sockets=[<socket.socket fd=11, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8001)>]> is serving

Как сделать так, чтобы приложение Quart работало вместе с клиентом автобана неблокирующим образом? Таким образом, автобан открывается и поддерживает соединение через веб-сокет с маршрутизатором Crossbar и молча прослушивает в фоновом режиме.


person Andrew K    schedule 28.03.2019    source источник


Ответы (1)


Что ж, после многих бессонных ночей я наконец нашел хороший способ решить эту головоломку.

Благодаря этому сообщению C-Python asyncio: запуск discord.py в ветка

Итак, я переписал свой код таким образом и смог запустить свое приложение Quart с клиентом автобана внутри, и оба активно работают в неблокирующем режиме. Весь __init__.py выглядит так:

from quart import Quart, request, current_app
from config import Config


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    return app


# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
import threading


class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))


    def onDisconnect(self):
        print('Autobahn disconnected')

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


async def start():
    runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm1')
    await runner.run(Component) # use client.start instead of client.run

def run_it_forever(loop):
    loop.run_forever()

asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
loop = asyncio.get_event_loop()
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

В этом сценарии мы создаем задачу с runner.run autobahn и присоединяем ее к текущему циклу, а затем запускаем этот цикл навсегда в новом потоке.

Меня вполне устраивало текущее решение ... но потом выяснилось, что это решение имеет некоторые недостатки, которые были для меня критичными, например: переподключиться, если соединение разорвано (т. Е. Перекрестный маршрутизатор становится недоступным). При таком подходе, если соединение не удалось инициализировать или сбросить через некоторое время, оно не будет пытаться восстановить соединение. Кроме того, для меня было неочевидно, как использовать ApplicationSession API, т.е. регистрировать / вызывать RPC из кода в моем приложении Quart.

К счастью, я заметил еще один новый компонентный API, который autobahn использовал в своей документации: https://autobahn.readthedocs.io/en/latest/wamp/programming.html#registering-procedures https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/component/backend.py

Он имеет функцию автоматического переподключения и легко регистрирует функции для RPC с помощью декораторов @component.register('com.something.do'), вам просто нужно import component раньше.

Итак, вот окончательный вид решения __init__.py:

from quart import Quart, request, current_app
from config import Config

def create_app(config_class=Config):
    ...
    return app

from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
import asyncio
import ssl
import threading


component = Component(
    transports=[
        {
            "type": "websocket",
            "url": u"ws://localhost:8080/ws",
            "endpoint": {
                "type": "tcp",
                "host": "localhost",
                "port": 8080,
            },
            "options": {
                "open_handshake_timeout": 100,
            }
        },
    ],
    realm=u"realm1",
)

@component.on_join
def join(session, details):
    print("joined {}".format(details))

async def start():
    await component.start() #used component.start() instead of run([component]) as it's async function

def run_it_forever(loop):
    loop.run_forever()

loop = asyncio.get_event_loop()
#asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
asyncio.get_child_watcher().attach_loop(loop)
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

Надеюсь, это кому-нибудь поможет. Ваше здоровье!

person Andrew K    schedule 30.03.2019