ActiveMQ一些消费者如果到达生产者后面,他们不会完成任务
我只是从ActiveMQ开始,我似乎有一个奇怪的问题。 (来源如下)
有两种情况
消费者连接到代理,等待队列中的任务。 制片人稍后到达,放下任务清单,并由不同的消费者正确接受并执行。 这工作正常,我也模拟它。
生产者首先连接,删除任务列表。 目前没有消费者连接。 现在当我们说3个消费者--C1,C2和C3连接到经纪人时(按照这个顺序),我发现只有C1接受并完成了生产者放弃的任务。 C2和C3保持空闲状态。 为什么会发生?
关于第二种情况,我还注意到了另外一件事情 - 如果生产者继续在队列中放置任务,则C2和C3将完成任务,但如果生产者在之前(如前所述)放弃任务,则C2和C3不会做任何事情。
生产者代码
package com.activemq.apps;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.commons.helpers.Maths;
public class Publisher implements MessageListener {
private static String _URL;
private static String _TOPIC_PUBLISH;
private static String _TOPIC_CONSUME;
public Publisher (String URL, String TOPIC) {
_URL = URL;
_TOPIC_PUBLISH = TOPIC + "_REQUESTS";
_TOPIC_CONSUME = TOPIC + "_RESPONSES";
}
public void initialize() {
try
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);
MessageProducer producer = session.createProducer(destinationProducer);
MessageConsumer consumer = session.createConsumer(destinationConsumers);
consumer.setMessageListener(this);
int count = 0;
System.out.println("Sending requests");
while (true)
{
int randomSleepTime = Maths.rand(1000, 5000);
String messageToSend = count + "_" + randomSleepTime;
TextMessage message = session.createTextMessage(messageToSend);
producer.send(message);
System.out.println("Job #" + count + " | " + (randomSleepTime/1000) + "s");
if (count++%10 == 0)
Thread.sleep(10000);
}
// System.out.println("Waiting for responses");
// connection.close();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage)
{
TextMessage msg = (TextMessage) message;
try {
String response = msg.getText();
String[] responseSplit = response.split("_");
String clientId = responseSplit[1];
String count = responseSplit[0];
System.out.println("Got response from " + clientId + " Job #" + count);
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
}
消费者代码
package com.activemq.apps;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer implements MessageListener {
private static String _URL;
private static String _TOPIC_PUBLISH;
private static String _TOPIC_CONSUME;
private static String _CLIENTID;
private MessageProducer producer;
private Session session;
public Consumer (String URL, String TOPIC) {
_URL = URL;
_TOPIC_PUBLISH = TOPIC + "_RESPONSES";
_TOPIC_CONSUME = TOPIC + "_REQUESTS";
}
public void initialize() {
try
{
_CLIENTID = UUID.randomUUID().toString();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);
producer = session.createProducer(destinationProducer);
MessageConsumer consumer = session.createConsumer(destinationConsumers);
consumer.setMessageListener(this);
System.out.println("Client: " + _CLIENTID + "nWaiting to pick up tasks");
// connection.close();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage)
{
TextMessage msg = (TextMessage) message;
try
{
String[] messageSplits = msg.getText().split("_");
String count = messageSplits[0];
String timeString = messageSplits[1];
int sleepFor = Integer.parseInt(timeString);
System.out.println("Job #" + count + " | Sleeping for " + (sleepFor/1000) + "s");
Thread.sleep(sleepFor);
TextMessage sendToProducer = session.createTextMessage(count + "_" + _CLIENTID);
producer.send(sendToProducer);
}
catch (JMSException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
你提到过
现在让我们说3个消费者 - C1,C2和C3连接到经纪人(按照这个顺序)
由于C1首先连接,因此在连接后立即开始获取队列中的所有消息。 这是预料之中的。 所以我没有看到任何问题。 C2,C3不是空闲的,但C1在C2和C3可以之前已经获得了消息。
我不确定生产者发送了多少条消息。 我假设消息的数量较少。 看看你期望什么尝试从制片人发送许多消息,如成千上万或数百万,然后启动消费者。 大量的消息是主观的,取决于内存,网络和其他资源。 你可能会看到你的期望。
我不认为这里有什么奇怪的。 队列表示应该只有一个用户的P2P模式。 在我们的案例中,我们有3个消费者,但并不禁止,但不能保证消费者将收到消息的任何特定顺序。
链接地址: http://www.djcxy.com/p/33051.html上一篇: ActiveMQ some consumers not picking up tasks if they arrive after producer
