RPC使用EventMachine&RabbitMQ

我已经开始尝试使用AMQP gem doc中提供的RabbitMQ RPC示例代码,尝试编写执行同步远程调用的非常简单的代码:

require "amqp"

module RPC
  class Base
    include EM::Deferrable

    def rabbit(rabbit_callback)
      rabbit_loop = Proc.new {
        AMQP.connect do |connection|
          AMQP::Channel.new(connection) do |channel|
            channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
              self.callback(&rabbit_callback)
              self.succeed(connection, channel, requests_queue)
            end # requests_queue
          end # AMQP.channel
        end # AMQP.connect

        Signal.trap("INT")  { connection.close { EM.stop } }
        Signal.trap("TERM") { connection.close { EM.stop } }
      }

      if !EM.reactor_running?
        EM.run do
          rabbit_loop.call
        end
      else
        rabbit_loop.call
      end
    end
  end

  class Server < Base

    def run
      server_loop = Proc.new do |connection, channel, requests_queue|
        consumer = AMQP::Consumer.new(channel, requests_queue).consume
        consumer.on_delivery do |metadata, payload|
          puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
          channel.default_exchange.publish(Time.now.to_s,
                                           :routing_key    => metadata.reply_to,
                                           :correlation_id => metadata.message_id,
                                           :mandatory      => true)
          metadata.ack
        end
      end
      rabbit(server_loop)
    end

  end

  class Client < Base

    def sync_push(request)
      result = nil
      sync_request = Proc.new do |connection, channel, requests_queue|
        message_id = Kernel.rand(10101010).to_s

        response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
        response_queue.subscribe do |headers, payload|
          if headers.correlation_id == message_id
            result = payload
            connection.close { EM.stop }
          end
        end

        EM.add_timer(0.1) do 
          puts "[request] Sending a request...#{request} with id #{message_id}"
          channel.default_exchange.publish(request,
                                           :routing_key => requests_queue.name,
                                           :reply_to    => response_queue.name,
                                           :message_id  => message_id)
        end
      end

      rabbit(sync_request)
      result
    end
  end
end

这个想法很简单:我想有一个消息队列总是准备好的(这是通过rabbit方法处理的)。 每当客户想要发送一个请求时,它首先创建一个响应的临时队列以及一个消息ID; 它然后将该请求发布到主消息队列,并等待临时队列中具有相同消息ID的响应,以便知道何时针对该特定请求的答案准备就绪。 我猜想message_id对于临时队列是多余的(因为队列也应该是唯一的)。

我现在使用这个客户端/服务器代码运行虚拟脚本

# server session
>> server = RPC::Server.new
=> #<RPC::Server:0x007faaa23bb5b0>
>> server.run
Updating client properties
[requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0...

# client session
>> client = RPC::Client.new
=> #<RPC::Client:0x007ffb6be6aed8>
>> client.sync_push "test 1"
Updating client properties
[request] Sending a request...test 1 with id 3315740
=> "2012-11-02 21:58:45 +0100"
>> client.sync_push "test 2"
AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="x002x00nx00x00x00fx00x00x00x00", @channel=1>

有两点我真的不明白:

  • 与EventMachine相关:在Client代码中,如果我想让我的消息真正发布,为什么我必须调用EM.add_timer ? 为什么使用EM.next_tick不起作用? 我的理解是,在这里发布发布时,“一切”应该是“准备好”的。
  • 与AMQP相关:为什么我的客户端因第二个请求的关闭连接而崩溃? 每次推送新请求时,都应创建一个全新的EM / AMQP循环。
  • 可悲的是,很少有代码可用于在线处理EM / AMQP,所以任何帮助将深表感谢! 关于这个效率的任何评论也将非常感谢。


    挖掘文档,我终于发现,我确实需要once_declared回调来确保客户端开始使用它时队列已准备就绪。

    关于连接问题,似乎使用EM::Deferrable引起了一些问题,所以这个(非常不令人满意的)解决方案仅仅是不包括EM::Deferrable

    require "amqp"
    
    module RPC
    
      module Base
    
        def rabbit(rabbit_callback)
          rabbit_loop = Proc.new {
            AMQP.start do |connection|
              AMQP::Channel.new(connection) do |channel|
                channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
                  requests_queue.once_declared do
                    rabbit_callback.call(connection, channel, requests_queue)
                  end
                end
              end
            end
    
            Signal.trap("INT")  { AMQP.stop { EM.stop } }
            Signal.trap("TERM") { AMQP.stop { EM.stop } }
          }
    
          if !EM.reactor_running?
            @do_not_stop_reactor = false
            EM.run do
              rabbit_loop.call
            end
          else
            @do_not_stop_reactor = true
            rabbit_loop.call
          end
        end
      end
    
      class Server
        include Base
    
        def run
          server_loop = Proc.new do |connection, channel, requests_queue|
            consumer = AMQP::Consumer.new(channel, requests_queue).consume
            consumer.on_delivery do |metadata, payload|
              puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
              channel.default_exchange.publish(Time.now.to_s,
                                               :routing_key    => metadata.reply_to,
                                               :correlation_id => metadata.message_id,
                                               :mandatory      => true)
              metadata.ack
            end
          end
          rabbit(server_loop)
        end
    
      end
    
      class Client
        include Base
    
        def sync_push(request)
          result = nil
          sync_request = Proc.new do |connection, channel, requests_queue|
            message_id = Kernel.rand(10101010).to_s
    
            response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
            response_queue.subscribe do |headers, payload|
              if headers.correlation_id == message_id
                result = payload
                AMQP.stop { EM.stop unless @do_not_stop_reactor }
              end
            end
    
            response_queue.once_declared do
              puts "[request] Sending a request...#{request} with id #{message_id}"
              channel.default_exchange.publish(request,
                                               :routing_key => requests_queue.name,
                                               :reply_to    => response_queue.name,
                                               :message_id  => message_id)
            end
          end
    
          rabbit(sync_request)
          result
        end
      end
    end
    
    链接地址: http://www.djcxy.com/p/61219.html

    上一篇: RPC using EventMachine & RabbitMQ

    下一篇: MassTransit with RabbitMQ: MT timing out on second Request/Response