向使用Pika + sockjs的所有客户广播消息

我是基于WebSockets的实时应用程序的新手,并且一度陷入困境。 我的应用程序有以下组件:

  • 简单的Python脚本,当某些通用事件发生时被触发。 它接收数据并使用pika将其发送到队列(RabbitMQ)。
  • 龙卷风应用程序(sockjs-tornado)接收来自队列(pika异步客户端)的消息,处理其内容,将新的应用程序状态保存到数据库并向客户端(SockJS客户端)广播数据。 与客户的通信只是一个方向 - 他们只是连接到服务器并接收数据。
  • 问题是,我不知道如何将从队列接收到的数据传递给所有客户端。 我已经完成了pub / sub交换,所以当用户连接到服务器时,每个用户都建立了与RabbitMQ的新连接,但这不是我想要的。 以下是我迄今得到的。

    common / pika_client.py

    import logging
    
    import pika
    from pika.adapters.tornado_connection import TornadoConnection
    
    
    class PikaClient(object):
    
        def __init__(self, exchange, host='localhost', port=5672, vhost=''):
            # Default values
            self.connected = False
            self.connecting = False
            self.connection = None
            self.channel = None
            self.host = host
            self.port = port
            self.vhost = vhost
            self.exchange = exchange
    
            # preparing logger
            self.log = logging.getLogger(__name__)
            self.set_log_level()
    
        def set_log_level(self, log_level=logging.WARNING):
            self.log.setLevel(log_level)
    
        def connect(self):
            self.log.info("CONNECTING")
            if self.connecting:
                self.log.info('%s: Already connecting to RabbitMQ' %
                    self.__class__.__name__)
                return
    
            self.log.info('%s: Connecting to RabbitMQ on localhost:5672' %
                self.__class__.__name__)
            self.connecting = True
    
            param = pika.ConnectionParameters(
                host=self.host,
                port=self.port,
                virtual_host=self.vhost
            )
    
            self.connection = TornadoConnection(param,
                on_open_callback=self.on_connected)
            self.connection.add_on_close_callback(self.on_closed)
    
        def on_connected(self, connection):
            self.log.info('%s: Connected to RabbitMQ on localhost:5672' %
                self.__class__.__name__)
            self.connected = True
            self.connection = connection
            self.connection.channel(self.on_channel_open)
    
        def on_channel_open(self, channel):
            self.log.info('%s: Channel Open, Declaring Exchange %s' %
                (self.__class__.__name__, self.exchange))
            self.channel = channel
            self.channel.exchange_declare(
                exchange=self.exchange,
                type="fanout",
                callback=self.on_exchange_declared
            )
    
        def on_exchange_declared(self, frame):
            self.log.info('%s: Exchange Declared, Declaring Queue' %
                (self.__class__.__name__))
            self.channel.queue_declare(exclusive=True,
                callback=self.on_queue_declared)
    
        def on_queue_declared(self, frame):
            self.log.info('%s: Queue Declared, Binding Queue %s' %
                (self.__class__.__name__, frame.method.queue))
            self.queue_name = frame.method.queue
            self.channel.queue_bind(
                exchange=self.exchange,
                queue=frame.method.queue,
                callback=self.on_queue_bound
            )
    
        def on_queue_bound(self, frame):
            self.log.info('%s: Queue Bound. To receive messages implement 
                           on_queue_bound method' % self.__class__.__name__)
    
        def on_closed(self, connection):
            self.log.info('%s: Connection Closed' % self.__class__.__name__)
            self.connected = False
            self.connection = None
            self.connecting = False
            self.channel = None
            self.connection = self.connect()
    
        def add_message_handler(self, handler):
            self.message_handler = handler
    

    tracker.py

    from sockjs.tornado import SockJSConnection
    
    import settings
    from common.pika_client import PikaClient
    
    
    class QueueReceiver(PikaClient):
        """Receives messages from RabbitMQ """
    
        def on_queue_bound(self, frame):
            self.log.info('Consuming on queue %s' % self.queue_name)
            self.channel.basic_consume(consumer_callback=self.message_handler,
                queue=self.queue_name
            )
    
    
    class TrackerConnection(SockJSConnection):
    
        def on_open(self, info):
            self.queue = QueueReceiver('clt')
            self.queue.add_message_handler(self.on_queue_message)
            self.queue.set_log_level(settings.LOG_LEVEL)
            self.queue.connect()
    
        def on_queue_message(self, channel, method, header, body):
            self.send(body)
            self.queue.channel.basic_ack(delivery_tag=method.delivery_tag)
    

    它可以工作,但正如我所提到的,我希望只有一个连接来排队,接收消息,做一些事情并使用broadcast()方法向客户端广播结果。 预先感谢您的帮助。

    链接地址: http://www.djcxy.com/p/44769.html

    上一篇: Broadcasting message to all clients using Pika + sockjs

    下一篇: Tornado Web Framework Mysql connection handling