Kafka: Polling after producer transaction doesn't get the produced messages









up vote
0
down vote

favorite












I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.



Test case



When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:



My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.



Questions



  1. Am I something missing so that my transaction result from the last
    round is not visible to the consumer of the next round?

  2. Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
    are read?

  3. Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?

Configuration




  • Transactional consumer



    final Map consumerConfig = new LinkedHashMap<>();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
    consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());




  • Transactional producer



    final Map producerConfig = new LinkedHashMap<>();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
    producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



  • My poll timeout is 2sec


  • My understanding is that transactional producers are automatically idempotent and acks=all

  • My testcase is with only one broker and one replication. But of course I intend to use more in production

  • I use Kafka 2.0

  • My topic only has one partition

  • My thread has its own consumer group and is assigned to this single partition









share|improve this question



























    up vote
    0
    down vote

    favorite












    I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.



    Test case



    When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:



    My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.



    Questions



    1. Am I something missing so that my transaction result from the last
      round is not visible to the consumer of the next round?

    2. Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
      are read?

    3. Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?

    Configuration




    • Transactional consumer



      final Map consumerConfig = new LinkedHashMap<>();
      consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
      consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
      consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
      consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
      consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
      consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
      consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
      consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());




    • Transactional producer



      final Map producerConfig = new LinkedHashMap<>();
      producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
      producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
      producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



    • My poll timeout is 2sec


    • My understanding is that transactional producers are automatically idempotent and acks=all

    • My testcase is with only one broker and one replication. But of course I intend to use more in production

    • I use Kafka 2.0

    • My topic only has one partition

    • My thread has its own consumer group and is assigned to this single partition









    share|improve this question

























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.



      Test case



      When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:



      My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.



      Questions



      1. Am I something missing so that my transaction result from the last
        round is not visible to the consumer of the next round?

      2. Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
        are read?

      3. Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?

      Configuration




      • Transactional consumer



        final Map consumerConfig = new LinkedHashMap<>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
        consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());




      • Transactional producer



        final Map producerConfig = new LinkedHashMap<>();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
        producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



      • My poll timeout is 2sec


      • My understanding is that transactional producers are automatically idempotent and acks=all

      • My testcase is with only one broker and one replication. But of course I intend to use more in production

      • I use Kafka 2.0

      • My topic only has one partition

      • My thread has its own consumer group and is assigned to this single partition









      share|improve this question















      I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.



      Test case



      When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:



      My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.



      Questions



      1. Am I something missing so that my transaction result from the last
        round is not visible to the consumer of the next round?

      2. Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition
        are read?

      3. Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?

      Configuration




      • Transactional consumer



        final Map consumerConfig = new LinkedHashMap<>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
        consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());




      • Transactional producer



        final Map producerConfig = new LinkedHashMap<>();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
        producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



      • My poll timeout is 2sec


      • My understanding is that transactional producers are automatically idempotent and acks=all

      • My testcase is with only one broker and one replication. But of course I intend to use more in production

      • I use Kafka 2.0

      • My topic only has one partition

      • My thread has its own consumer group and is assigned to this single partition






      transactions apache-kafka kafka-consumer-api kafka-producer-api






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 9 at 8:47

























      asked Nov 8 at 23:09









      Lukas Lentner

      1319




      1319






















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote



          accepted










          For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.



          Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.



          For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.



          Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.






          share|improve this answer




















          • Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
            – Lukas Lentner
            Nov 9 at 8:51










          • I added a looping polling until it returns empty and it works like a charm. Thank you very much!
            – Lukas Lentner
            Nov 9 at 13:38










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53217534%2fkafka-polling-after-producer-transaction-doesnt-get-the-produced-messages%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          0
          down vote



          accepted










          For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.



          Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.



          For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.



          Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.






          share|improve this answer




















          • Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
            – Lukas Lentner
            Nov 9 at 8:51










          • I added a looping polling until it returns empty and it works like a charm. Thank you very much!
            – Lukas Lentner
            Nov 9 at 13:38














          up vote
          0
          down vote



          accepted










          For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.



          Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.



          For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.



          Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.






          share|improve this answer




















          • Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
            – Lukas Lentner
            Nov 9 at 8:51










          • I added a looping polling until it returns empty and it works like a charm. Thank you very much!
            – Lukas Lentner
            Nov 9 at 13:38












          up vote
          0
          down vote



          accepted







          up vote
          0
          down vote



          accepted






          For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.



          Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.



          For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.



          Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.






          share|improve this answer












          For your understanding of how poll works, the parameter we pass to poll(), is a timeout interval and controls how long poll() will block if data is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, it will wait for the specified number of milliseconds for data to arrive from the broker.So if you configured poll for 0 milliseconds and there is no data in the data buffer and you wont receive any data.



          Coming to the point of you not receiving the data that has been produced recently, it depends on your configuration of the producer. Unless the produced message doesn't have its replicas and based on the acks parameter, the message will be available for the consumer to consume.



          For Example: if you have set the replicas as 3 and acks=all, unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume.



          Coming to the question , how can you know if you have read the entire partition, if your poll no longer gives you any records(assuming rest all are working fine) then it indicates that you have consumed all the messages for that topic.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 9 at 0:09









          user3679686

          10216




          10216











          • Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
            – Lukas Lentner
            Nov 9 at 8:51










          • I added a looping polling until it returns empty and it works like a charm. Thank you very much!
            – Lukas Lentner
            Nov 9 at 13:38
















          • Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
            – Lukas Lentner
            Nov 9 at 8:51










          • I added a looping polling until it returns empty and it works like a charm. Thank you very much!
            – Lukas Lentner
            Nov 9 at 13:38















          Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
          – Lukas Lentner
          Nov 9 at 8:51




          Thankx. I edited my question to make it clearer that I am unsure whether I do already everything I can on the producers side. Your comment answers my question 2 and 3. Can you check if my producer config ensure that this cannot happen: "unless all the replicators acknowledge the leader that they have received the message , this message wouldn't be made available for the consumer to consume."
          – Lukas Lentner
          Nov 9 at 8:51












          I added a looping polling until it returns empty and it works like a charm. Thank you very much!
          – Lukas Lentner
          Nov 9 at 13:38




          I added a looping polling until it returns empty and it works like a charm. Thank you very much!
          – Lukas Lentner
          Nov 9 at 13:38

















           

          draft saved


          draft discarded















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53217534%2fkafka-polling-after-producer-transaction-doesnt-get-the-produced-messages%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

          How do I collapse sections of code in Visual Studio Code for Windows?

          ャフサォクコ ケウ,コ,ワ メ,ロスョノ゙,クネ,フムカヤヲニ,エコ゚ツ ウイオン゙ケワサネォキモュキォウイノンコチ゚メヌナイゥフュ,カヒウネェ ネ,ホノケ,ムュキ ッボーミュハ,チ ツス ィ メウイマヤ,゙ウチ ヅ ロ,ォジヌェ ャヌット ェ,マャ,チナエヒネソキツテ トホヲヲミーァ