Hi Jozef,

I’m not aware if someone is working on this. In mean time, I created a Jira for 
this: https://issues.apache.org/jira/browse/BEAM-6466 
<https://issues.apache.org/jira/browse/BEAM-6466>
Feel free to contribute if you wish.

> On 17 Jan 2019, at 09:10, Jozef Vilcek <jozo.vil...@gmail.com> wrote:
> 
> Hello,
> was there any progress on this or JIRA I can follow? I could use bounded 
> processing over KafkaIO too. 
> 
> Thanks,
> Jozef
> 
> On Thu, Jan 10, 2019 at 4:57 PM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> Don’t you think that we could have some race condition there since, according 
> to initial issue description, sometimes offset was committed and sometimes 
> not?
> 
> 
>> On 9 Jan 2019, at 19:48, Raghu Angadi <ang...@gmail.com 
>> <mailto:ang...@gmail.com>> wrote:
>> 
>> Oh, the generic bounded source wrapper over an unbounded source does not 
>> seem to call finalize when it is done with a split. I think it should.
>> 
>> Could you file a bug for the wrapper? 
>> Mean while, this check could be added sanity checks in KafkaIO.Read.expand().
>> 
>> 
>> 
>> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia 
>> <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> 
>> wrote:
>> Hi Juan,
>> 
>> After researching a bit, I found this issue, which is open since 2017: 
>> https://issues.apache.org/jira/browse/BEAM-2185 
>> <https://issues.apache.org/jira/browse/BEAM-2185>
>> 
>> I guess KafkaIO isn't intended to provide a bounded source. Maybe I should 
>> write my own code that fetches messages from kafka, even if it means giving 
>> up on some processing guarantees from beam...
>> 
>> 
>> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarc...@gmail.com 
>> <mailto:jcgarc...@gmail.com>> escreveu:
>> Just for you to have a look where this happen:
>> 
>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>>  
>> <https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584>
>> 
>> Cheers
>> 
>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com 
>> <mailto:jcgarc...@gmail.com>> wrote:
>> I also experience the same, as per the documentation *withMaxReadTime* and 
>> *withMaxNumRecords* are mainly used for Demo purposes, so i guess is beyond 
>> the scope of the current KafkaIO to behave as Bounded with offset management 
>> or just something is missing in the current implementation (Watermarking).
>> 
>> 
>> 
>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia 
>> <andre.missag...@arquivei.com.br <mailto:andre.missag...@arquivei.com.br>> 
>> wrote:
>> Hello everyone,
>> 
>> I need to do some batch processing that uses messages in a Kafka topic. So I 
>> tried the "withMaxReadTime" KafkaIO setting:
>> 
>> ---
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "...")
>> properties.setProperty("group.id <http://group.id/>", "mygroup")
>> properties.setProperty("sasl.jaas.config", "...")
>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>> properties.setProperty("enable.auto.commit", "false")
>> 
>> sc.customInput("Read From Kafka",
>>   KafkaIO
>>     .read[String, String]()
>>     .withTopic("mytopic")
>>     .withKeyDeserializer(classOf[StringDeserializer])
>>     .withValueDeserializer(classOf[StringDeserializer])
>>     .updateConsumerProperties(properties)
>>     .withMaxReadTime(Duration.standardSeconds(20))
>>     .withMaxNumRecords(1000000)
>>     .commitOffsetsInFinalize()
>>     .withoutMetadata()
>> )
>> .count.debug() // prints something between 10000 and 20000
>> ---
>> I can see that it was able to read the messages and process them. But in the 
>> end, no offset was commited:
>> 
>> TOPIC                                        PARTITION  CURRENT-OFFSET  
>> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>> mytopic                                         0          0               
>> 3094751         3094751         -               -               -
>> 
>> But it is a strange behavior: sometimes it commits the offset, sometimes 
>> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>> 
>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>> 
>> Best Regards,
>> 
>> -- 
>> André Badawi Missaglia
>> Data Engineer
>> (16) 3509-5515 | www.arquivei.com.br 
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>  <https://www.facebook.com/arquivei>  
>> <https://www.linkedin.com/company/arquivei>  
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>> 
>> -- 
>> 
>> JC 
>> 
>> 
>> 
>> -- 
>> 
>> JC 
>> 
>> 
>> 
>> -- 
>> André Badawi Missaglia
>> Data Engineer
>> (16) 3509-5515 | www.arquivei.com.br 
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>  <https://www.facebook.com/arquivei>  
>> <https://www.linkedin.com/company/arquivei>  
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>

Reply via email to