Невозможно испустить после вызова rabbitmq channel.start_using() в обработчике flask-socketio

Я пытаюсь прослушать очередь rabbitmq из обработчика событий flask-socketio, чтобы отправлять уведомления в реальном времени в веб-приложение. Моя установка на данный момент:

Сервер

import pika
import sys
from flask import Flask, request
from flask_socketio import SocketIO, emit, disconnect

app = Flask(__name__)
app.config['SECRET_KEY'] = 'not-so-secret'
socketio = SocketIO(app)

def is_authenticated():
  return True

def rabbit_callback(ch, method, properties, body):
    socketio.emit('connect', {'data': 'yes'})
    print "body: ", body

@socketio.on('connect')
def connected():
    emit('notification', {'data': 'Connected'})
    creds = pika.PlainCredentials(
        username="username",
        password="password")

    params = pika.ConnectionParameters(
        host="localhost",
        credentials=creds,
        virtual_host="/")

    connection = pika.BlockingConnection(params)

    # This is one channel inside the connection
    channel = connection.channel()

    # Declare the exchange we're going to use
    exchange_name = 'user'
    channel.exchange_declare(exchange=exchange_name,
                             type='topic')
    channel.queue_declare(queue='notifications')
    channel.queue_bind(exchange='user',
                       queue='notifications',
                       routing_key='#')

    channel.basic_consume(rabbit_callback,
                          queue='notifications',
                          no_ack=True)
    channel.start_consuming()


if __name__ == '__main__':
    socketio.run(app, port=8082)

Браузер

<script type="text/javascript" charset="utf-8">
    var socket = io.connect('http://' + document.domain + ':8082');
    socket.on('connect', function(resp) {
        console.log(resp);
    });
    socket.on('disconnect', function(resp) {
        console.log(resp);
    });
    socket.on('error', function(resp) {
        console.log(resp);
    });
    socket.on('notification', function(resp) {
        console.log(resp);
    });
</script>

Если я закомментирую строку «channel.start_using()» в нижней части кода сервера и загружу страницу браузера, я успешно подключусь к flask-socketio и увижу {данные: «Подключено»} в своей консоли.

Когда я раскомментирую строку, я не вижу {данные: "Подключено"} в своей консоли. Тем не менее, когда я отправляю сообщение в очередь уведомлений, срабатывает функция rabbit_callback. Я вижу свое сообщение, напечатанное на консоли сервера, но вызов emit не работает. Ошибок на сервере и в браузере нет. Любые советы очень ценятся.

Спасибо!


person Troy    schedule 23.11.2015    source источник
comment
Вы используете eventlet или gevent на этом сервере? Если вы это сделаете, вам, вероятно, потребуется исправить обезьяну в стандартной библиотеке, чтобы сделать блокирующие функции pika неблокирующими.   -  person Miguel    schedule 24.11.2015
comment
Вы смогли решить свою проблему?   -  person Amin Alaee    schedule 07.12.2016
comment
В итоге я не использовал flask-socketio. Вместо этого я использовал реализацию режима.   -  person Troy    schedule 07.12.2016
comment
Узел... не режим   -  person Troy    schedule 07.12.2016


Ответы (1)


У меня была такая же проблема с использованием eventlet, и я просто решил добавить:

import eventlet
eventlet.monkey_patch()

, в начале моего исходного кода.

В любом случае мой код немного отличается и использует метод start_background_task:

import pika    
from threading import Lock
from flask import Flask, render_template, session, request, copy_current_request_context


from flask_socketio import SocketIO, emit, join_room, leave_room, \
    close_room, rooms, disconnect

app = Flask(__name__, static_url_path='/static')
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()

@socketio.on('connect', namespace='/test')
def test_connect():
    global thread
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(target=get_messages)

    emit('my_response', {'data': 'Connected', 'count': 0})
    print('connected')

def get_messages():
    channel = connect_rabbitmq()
    channel.start_consuming()

def connect_rabbitmq():
    cred = pika.credentials.PlainCredentials('username', 'password')
    conn_param = pika.ConnectionParameters(host='yourhostname',
                                           credentials=cred)
    connection = pika.BlockingConnection(conn_param)
    channel = connection.channel()

    channel.exchange_declare(exchange='ncs', exchange_type='fanout')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='myexchangename', queue=queue_name)

    channel.basic_consume(callback, queue=queue_name, no_ack=True)
    return channel

Надеюсь это поможет...

person Pablo Mellado    schedule 14.09.2017