Hello everyone,
I need to do some batch processing that uses messages in a Kafka topic. So
I tried the "withMaxReadTime" KafkaIO setting:
---
val properties = new Properties()
properties.setProperty("bootstrap.servers", "...")
properties.setProperty("group.id", "mygroup")
properties.setProperty("sasl.jaas.config", "...")
properties.setProperty("security.protocol", "SASL_PLAINTEXT")
properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
properties.setProperty("enable.auto.commit", "false")
sc.customInput("Read From Kafka",
KafkaIO
.read[String, String]()
.withTopic("mytopic")
.withKeyDeserializer(classOf[StringDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.updateConsumerProperties(properties)
.withMaxReadTime(Duration.standardSeconds(20))
.withMaxNumRecords(1000000)
.commitOffsetsInFinalize()
.withoutMetadata()
)
.count.debug() // prints something between 10000 and 20000
---
I can see that it was able to read the messages and process them. But in
the end, no offset was commited:
TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mytopic 0 0
3094751 3094751 - - -
But it is a strange behavior: sometimes it commits the offset, sometimes
not. I'm not sure if it is a bug, or I'm using the wrong configs.
Has anyone used Bounded KafkaIO before? is there anything I can do?
Best Regards,
--
*André Badawi Missaglia*
Data Engineer
(16) 3509-5515 *|* www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>