Kafka: Polling after producer transaction doesn't get the produced messages
up vote
0
down vote
favorite
I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.
Test case
When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:
My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.
Questions
- Am I something missing so that my transaction result from the last
round is not visible to the consumer of the next round? - Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
are read? - Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?
Configuration
Transactional consumer
final Map consumerConfig = new LinkedHashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Transactional producer
final Map producerConfig = new LinkedHashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());My poll timeout is 2sec
- My understanding is that transactional producers are automatically idempotent and acks=all
- My testcase is with only one broker and one replication. But of course I intend to use more in production
- I use Kafka 2.0
- My topic only has one partition
- My thread has its own consumer group and is assigned to this single partition
transactions apache-kafka kafka-consumer-api kafka-producer-api
add a comment |
up vote
0
down vote
favorite
I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.
Test case
When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:
My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.
Questions
- Am I something missing so that my transaction result from the last
round is not visible to the consumer of the next round? - Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
are read? - Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?
Configuration
Transactional consumer
final Map consumerConfig = new LinkedHashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Transactional producer
final Map producerConfig = new LinkedHashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());My poll timeout is 2sec
- My understanding is that transactional producers are automatically idempotent and acks=all
- My testcase is with only one broker and one replication. But of course I intend to use more in production
- I use Kafka 2.0
- My topic only has one partition
- My thread has its own consumer group and is assigned to this single partition
transactions apache-kafka kafka-consumer-api kafka-producer-api
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.
Test case
When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:
My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.
Questions
- Am I something missing so that my transaction result from the last
round is not visible to the consumer of the next round? - Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
are read? - Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?
Configuration
Transactional consumer
final Map consumerConfig = new LinkedHashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Transactional producer
final Map producerConfig = new LinkedHashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());My poll timeout is 2sec
- My understanding is that transactional producers are automatically idempotent and acks=all
- My testcase is with only one broker and one replication. But of course I intend to use more in production
- I use Kafka 2.0
- My topic only has one partition
- My thread has its own consumer group and is assigned to this single partition
transactions apache-kafka kafka-consumer-api kafka-producer-api
I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.
Test case
When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:
My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.
Questions
- Am I something missing so that my transaction result from the last
round is not visible to the consumer of the next round? - Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
are read? - Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?
Configuration
Transactional consumer
final Map consumerConfig = new LinkedHashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Transactional producer
final Map producerConfig = new LinkedHashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());My poll timeout is 2sec
- My understanding is that transactional producers are automatically idempotent and acks=all
- My testcase is with only one broker and one replication. But of course I intend to use more in production
- I use Kafka 2.0
- My topic only has one partition
- My thread has its own consumer group and is assigned to this single partition
transactions apache-kafka kafka-consumer-api kafka-producer-api
transactions apache-kafka kafka-consumer-api kafka-producer-api
edited Nov 9 at 8:47
asked Nov 8 at 23:09
Lukas Lentner
1319
1319
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
0
down vote
accepted
For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.
Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.
For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.
Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.
Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
– Lukas Lentner
Nov 9 at 8:51
I added a looping polling until it returns empty and it works like a charm. Thank you very much!
– Lukas Lentner
Nov 9 at 13:38
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
accepted
For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.
Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.
For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.
Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.
Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
– Lukas Lentner
Nov 9 at 8:51
I added a looping polling until it returns empty and it works like a charm. Thank you very much!
– Lukas Lentner
Nov 9 at 13:38
add a comment |
up vote
0
down vote
accepted
For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.
Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.
For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.
Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.
Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
– Lukas Lentner
Nov 9 at 8:51
I added a looping polling until it returns empty and it works like a charm. Thank you very much!
– Lukas Lentner
Nov 9 at 13:38
add a comment |
up vote
0
down vote
accepted
up vote
0
down vote
accepted
For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.
Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.
For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.
Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.
For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.
Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.
For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.
Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.
answered Nov 9 at 0:09
user3679686
10216
10216
Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
– Lukas Lentner
Nov 9 at 8:51
I added a looping polling until it returns empty and it works like a charm. Thank you very much!
– Lukas Lentner
Nov 9 at 13:38
add a comment |
Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
– Lukas Lentner
Nov 9 at 8:51
I added a looping polling until it returns empty and it works like a charm. Thank you very much!
– Lukas Lentner
Nov 9 at 13:38
Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
– Lukas Lentner
Nov 9 at 8:51
Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
– Lukas Lentner
Nov 9 at 8:51
I added a looping polling until it returns empty and it works like a charm. Thank you very much!
– Lukas Lentner
Nov 9 at 13:38
I added a looping polling until it returns empty and it works like a charm. Thank you very much!
– Lukas Lentner
Nov 9 at 13:38
add a comment |
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53217534%2fkafka-polling-after-producer-transaction-doesnt-get-the-produced-messages%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown