Hi Jozef, I’m not aware if someone is working on this. In mean time, I created a Jira for this: https://issues.apache.org/jira/browse/BEAM-6466 <https://issues.apache.org/jira/browse/BEAM-6466> Feel free to contribute if you wish.
> On 17 Jan 2019, at 09:10, Jozef Vilcek <jozo.vil...@gmail.com> wrote: > > Hello, > was there any progress on this or JIRA I can follow? I could use bounded > processing over KafkaIO too. > > Thanks, > Jozef > > On Thu, Jan 10, 2019 at 4:57 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Don’t you think that we could have some race condition there since, according > to initial issue description, sometimes offset was committed and sometimes > not? > > >> On 9 Jan 2019, at 19:48, Raghu Angadi <ang...@gmail.com >> <mailto:ang...@gmail.com>> wrote: >> >> Oh, the generic bounded source wrapper over an unbounded source does not >> seem to call finalize when it is done with a split. I think it should. >> >> Could you file a bug for the wrapper? >> Mean while, this check could be added sanity checks in KafkaIO.Read.expand(). >> >> >> >> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia >> <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> >> wrote: >> Hi Juan, >> >> After researching a bit, I found this issue, which is open since 2017: >> https://issues.apache.org/jira/browse/BEAM-2185 >> <https://issues.apache.org/jira/browse/BEAM-2185> >> >> I guess KafkaIO isn't intended to provide a bounded source. Maybe I should >> write my own code that fetches messages from kafka, even if it means giving >> up on some processing guarantees from beam... >> >> >> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarc...@gmail.com >> <mailto:jcgarc...@gmail.com>> escreveu: >> Just for you to have a look where this happen: >> >> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584 >> >> <https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584> >> >> Cheers >> >> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com >> <mailto:jcgarc...@gmail.com>> wrote: >> I also experience the same, as per the documentation *withMaxReadTime* and >> *withMaxNumRecords* are mainly used for Demo purposes, so i guess is beyond >> the scope of the current KafkaIO to behave as Bounded with offset management >> or just something is missing in the current implementation (Watermarking). >> >> >> >> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia >> <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> >> wrote: >> Hello everyone, >> >> I need to do some batch processing that uses messages in a Kafka topic. So I >> tried the "withMaxReadTime" KafkaIO setting: >> >> --- >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", "...") >> properties.setProperty("group.id <http://group.id/>", "mygroup") >> properties.setProperty("sasl.jaas.config", "...") >> properties.setProperty("security.protocol", "SASL_PLAINTEXT") >> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") >> properties.setProperty("enable.auto.commit", "false") >> >> sc.customInput("Read From Kafka", >> KafkaIO >> .read[String, String]() >> .withTopic("mytopic") >> .withKeyDeserializer(classOf[StringDeserializer]) >> .withValueDeserializer(classOf[StringDeserializer]) >> .updateConsumerProperties(properties) >> .withMaxReadTime(Duration.standardSeconds(20)) >> .withMaxNumRecords(1000000) >> .commitOffsetsInFinalize() >> .withoutMetadata() >> ) >> .count.debug() // prints something between 10000 and 20000 >> --- >> I can see that it was able to read the messages and process them. But in the >> end, no offset was commited: >> >> TOPIC PARTITION CURRENT-OFFSET >> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID >> mytopic 0 0 >> 3094751 3094751 - - - >> >> But it is a strange behavior: sometimes it commits the offset, sometimes >> not. I'm not sure if it is a bug, or I'm using the wrong configs. >> >> Has anyone used Bounded KafkaIO before? is there anything I can do? >> >> Best Regards, >> >> -- >> André Badawi Missaglia >> Data Engineer >> (16) 3509-5515 | www.arquivei.com.br >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> >> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >> <https://www.facebook.com/arquivei> >> <https://www.linkedin.com/company/arquivei> >> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> >> >> -- >> >> JC >> >> >> >> -- >> >> JC >> >> >> >> -- >> André Badawi Missaglia >> Data Engineer >> (16) 3509-5515 | www.arquivei.com.br >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> >> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >> <https://www.facebook.com/arquivei> >> <https://www.linkedin.com/company/arquivei> >> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>