Hi Dinh,

The check only de-duplicates in case the consumer processes the same offset multiple times. It ensures the offset is always increasing.

If this has been fixed in Kafka, which the comment assumes, the condition will never be true.

Which Kafka version are you using?

-Max

On 29.07.20 09:16, wang Wu wrote:
Hi,
I am curious about this comment:

if (offset < expected) { // -- (a)
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
        // should we check if the offset is way off from consumedOffset (say > 
1M)?
        LOG.warn(
        "{}: ignoring already consumed offset {} for {}",
        this,
        offset,
        pState.topicPartition);
        continue;
        }


https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167

Does it mean that Beam KafkaIO may skip processing some Kafka messages if the lag in consuming Kafka messages > 1 M?
Why Kafka compression may result in this bug?
Is there anyway to prevent loss messages and enable at-least-once delivery?

Context: We enable at-least-once delivery semantics on our Beam code by this code:

input
     .getPipeline()
     .apply(
         "ReadFromKafka",
         KafkaIO.readBytes()
             
.withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
             .withTopics(getTopics())
             .withConsumerConfigUpdates(
                 ImmutableMap.of(
                     ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false,
                     ConsumerConfig.GROUP_ID_CONFIG,groupId
))
             .withReadCommitted()
             .commitOffsetsInFinalize())

However, we notice that if we send > 1 millions Kafka message and the batch processing can not keep up, it seems that Beam process less number of messages than we sent.

Regards
Dinh

Reply via email to