MassTransit with RabbitMQ: MT timing out on second Request/Response

Working with a Request/Response scenario with MassTransit and RabbitMQ. When doing a simple Request/Reply it works multiple times. If I publish a message in the request handler it works on the first request, but the request handler never gets called on the second request and ends up timing out and the message stays in the Server Queue.

It seems like I am missing something; configuration maybe?

The project is at https://bitbucket.org/joeyoung/enterprise-rabbitmq

The client configuration:

ObjectFactory.Configure(cfg =>
{
    cfg.AddRegistry<WebRegistry>();
    cfg.For<IServiceBus>().Singleton().Use(o => ServiceBusFactory.New(sbc =>
    {
        // configure the bus
        sbc.UseRabbitMqRouting();
        sbc.ReceiveFrom("rabbitmq://localhost/entrprise_client");

        // finds all the consumers in the container and register them with the bus
        sbc.Subscribe(x => x.LoadFrom(ObjectFactory.Container));
    }));
});

The Server Configuration:

var container = new Container(cfg =>
{
    cfg.Scan(scan =>
    {
        scan.Assembly("Server.MessageHandlers");
        scan.AddAllTypesOf<IConsumer>();
    });
});

var bus = ServiceBusFactory.New(sbc =>
{
    // configure the bus
    sbc.UseRabbitMqRouting();
    sbc.ReceiveFrom("rabbitmq://localhost/enterprise_server");

    // finds all the consumers in the container and register them with the bus
    sbc.Subscribe(x => x.LoadFrom(container));
});

// finally inject the bus into the container
container.Inject(bus);

Sending the Request:

bus.PublishRequest(new CreateProductCommand(correlationId, model.Name, model.Description, model.Price), x =>
{
    x.HandleTimeout(10.Seconds(), () => { timedOut = true; });
    x.Handle<CreateProductCommand.Response>(response => { productId = response.Id; });
});

Consuming the Request:

public void Consume(IConsumeContext<CreateProductCommand> context)
{
    Console.Out.WriteLine("Consuming Create Product");

    // simulate creating a product
    var productId = "products/1234";

    bus.Publish(new ProductCreatedEvent(productId));

    context.Respond(new CreateProductCommand.Response(context.Message.CorrelationId) { Id = productId});
}

The Message:

public class CreateProductCommand : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; private set; }
    public string Name { get; set; }
    public string Description { get; set; }
    public decimal Price { get; set; }

    public CreateProductCommand(Guid correlationId, string name, string description, decimal price) 
    {
        CorrelationId = correlationId;
        Name = name;
        Description = description;
        Price = price;
    }

    public class Response : CorrelatedBy<Guid>
    {
        public Guid CorrelationId { get; private set; }

        public string Id { get; set; }

        public Response(Guid correlationId)
        {
            CorrelationId = correlationId;
        }
    }
}

Thanks to Chris for suggesting to use the Bus on the IConsumeContext. That seems to have fixed it.

So instead of injecting an IServiceBus in the handler's constructor you grab the bus from the context.

public class CreateProductCommandHandler : Consumes<CreateProductCommand>.Context
{
    public void Consume(IConsumeContext<CreateProductCommand> context)
    {
        Console.Out.WriteLine("Consuming Create Product");

        // simulate creating a product
        var productId = "products/1234";

        context.Bus.Publish(new ProductCreatedEvent(productId));

        context.Respond(new CreateProductCommand.Response(context.Message.CorrelationId) { Id = productId});
    }
}

我知道你想和RabbitMQ一起使用MT,我放弃了,把我的爱变成了EasyNetQ,试试看( - :

链接地址: http://www.djcxy.com/p/61218.html

上一篇: RPC使用EventMachine&RabbitMQ

下一篇: MassTransit与RabbitMQ:MT超时第二个请求/响应