Just for you to have a look where this happen: https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
Cheers On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <[email protected]> wrote: > I also experience the same, as per the documentation **withMaxReadTime** > and **withMaxNumRecords** are mainly used for Demo purposes, so i guess > is beyond the scope of the current KafkaIO to behave as Bounded with offset > management or just something is missing in the current implementation > (Watermarking). > > > > On Wed, Jan 9, 2019 at 2:28 PM André Missaglia < > [email protected]> wrote: > >> Hello everyone, >> >> I need to do some batch processing that uses messages in a Kafka topic. >> So I tried the "withMaxReadTime" KafkaIO setting: >> >> --- >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", "...") >> properties.setProperty("group.id", "mygroup") >> properties.setProperty("sasl.jaas.config", "...") >> properties.setProperty("security.protocol", "SASL_PLAINTEXT") >> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") >> properties.setProperty("enable.auto.commit", "false") >> >> sc.customInput("Read From Kafka", >> KafkaIO >> .read[String, String]() >> .withTopic("mytopic") >> .withKeyDeserializer(classOf[StringDeserializer]) >> .withValueDeserializer(classOf[StringDeserializer]) >> .updateConsumerProperties(properties) >> .withMaxReadTime(Duration.standardSeconds(20)) >> .withMaxNumRecords(1000000) >> .commitOffsetsInFinalize() >> .withoutMetadata() >> ) >> .count.debug() // prints something between 10000 and 20000 >> --- >> I can see that it was able to read the messages and process them. But in >> the end, no offset was commited: >> >> TOPIC PARTITION CURRENT-OFFSET >> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID >> mytopic 0 >> 0 3094751 3094751 - >> - - >> >> But it is a strange behavior: sometimes it commits the offset, sometimes >> not. I'm not sure if it is a bug, or I'm using the wrong configs. >> >> Has anyone used Bounded KafkaIO before? is there anything I can do? >> >> Best Regards, >> >> -- >> *André Badawi Missaglia* >> Data Engineer >> (16) 3509-5515 *|* www.arquivei.com.br >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >> Silício] >> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >> <https://www.facebook.com/arquivei> >> <https://www.linkedin.com/company/arquivei> >> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> >> > > > -- > > JC > > -- JC
