Having a Kafka Consumer read a single message at a time

We have Kafka setup to be able to process messages in parallel by several servers. But every message must only be processed exactly once (and by only one server). We have this up and running and it's working fine.

Now, the problem for us is that the Kafka Consumers reads messages in batches for maximal efficiency. This leads to a problem if/when processing fails, the server shuts down or whatever, because then we loose data that was about to be processed.

Is there a way to get the Consumer to only read on message at a time to let Kafka keep the unprocessed messages? Something like; Consumer pulls one message -> process -> commit offset when done, repeat. Is this feasible using Kafka? Any thoughts/ideas?

Thanks!


您可以尝试将max.poll.records设置为1。


You mention having exactly one processing, but then you're worried about losing data. I'm assuming you're just worried about the edge case when one of your server fails? And you lose data?

I don't think there's a way to accomplish one message at a time. Looking through the consumer configurations, there only seems to be a option for setting the max bytes a consumer can fetch from Kafka, not number of messages.

fetch.message.max.bytes

But if you're worried about losing data completely, if you never commit the offset Kafka will not mark is as being committed and it won't be lost. Reading through the Kafka documentation about delivery semantics,

So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.

So to achieve exactly-once processing is not something that Kafka enables by default. It requires you to implement storing the offset whenever you write the output of your processing to storage.

But this can be handled more simply and generally by simply letting the consumer store its offset in the same place as its output...As an example of this, our Hadoop ETL that populates data in HDFS stores its offsets in HDFS with the data it reads so that it is guaranteed that either data and offsets are both updated or neither is.

I hope that helps.


It depends on what client you are going to use. For C++ and python, it is possible to consume ONE message each time.

For python, I used https://github.com/mumrah/kafka-python. The following code can consume one message each time:

message = self.__consumer.get_message(block=False, timeout=self.IterTimeout, get_partition_info=True )

__consumer is the object of SimpleConsumer.

See my question and answer here:How to stop Python Kafka Consumer in program?

For C++, I am using https://github.com/edenhill/librdkafka. The following code can consume one message each time.

214         while( m_bRunning )
215         {
216                 // Start to read messages from the local queue.
217                 RdKafka::Message *msg = m_consumer->consume(m_topic, m_partition, 1000);
218                 msg_consume(msg);
219                 delete msg;
220                 m_consumer->poll(0);
221         }

m_consumer is the pointer to C++ Consumer object (C++ API).

Hope this help.

链接地址: http://www.djcxy.com/p/87450.html

上一篇: 哈斯克尔报告中一个不起眼的角落

下一篇: 让卡夫卡消费者一次只阅读一条消息