Just for you to have a look where this happen:

https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584

Cheers

On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <[email protected]>
wrote:

> I also experience the same, as per the documentation **withMaxReadTime**
> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess
> is beyond the scope of the current KafkaIO to behave as Bounded with offset
> management or just something is missing in the current implementation
> (Watermarking).
>
>
>
> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
> [email protected]> wrote:
>
>> Hello everyone,
>>
>> I need to do some batch processing that uses messages in a Kafka topic.
>> So I tried the "withMaxReadTime" KafkaIO setting:
>>
>> ---
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "...")
>> properties.setProperty("group.id", "mygroup")
>> properties.setProperty("sasl.jaas.config", "...")
>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>> properties.setProperty("enable.auto.commit", "false")
>>
>> sc.customInput("Read From Kafka",
>>   KafkaIO
>>     .read[String, String]()
>>     .withTopic("mytopic")
>>     .withKeyDeserializer(classOf[StringDeserializer])
>>     .withValueDeserializer(classOf[StringDeserializer])
>>     .updateConsumerProperties(properties)
>>     .withMaxReadTime(Duration.standardSeconds(20))
>>     .withMaxNumRecords(1000000)
>>     .commitOffsetsInFinalize()
>>     .withoutMetadata()
>> )
>> .count.debug() // prints something between 10000 and 20000
>> ---
>> I can see that it was able to read the messages and process them. But in
>> the end, no offset was commited:
>>
>> TOPIC                                        PARTITION  CURRENT-OFFSET
>> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>> mytopic                                         0
>> 0               3094751         3094751         -
>> -               -
>>
>> But it is a strange behavior: sometimes it commits the offset, sometimes
>> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>
>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>
>> Best Regards,
>>
>> --
>> *André Badawi Missaglia*
>> Data Engineer
>> (16) 3509-5515 *|* www.arquivei.com.br
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>> Silício]
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>> <https://www.facebook.com/arquivei>
>> <https://www.linkedin.com/company/arquivei>
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>
>
>
> --
>
> JC
>
>

-- 

JC

Reply via email to