Arseniy Tashoyan created FLINK-24851: ----------------------------------------
Summary: KafkaSourceBuilder: auto.offset.reset is ignored Key: FLINK-24851 URL: https://issues.apache.org/jira/browse/FLINK-24851 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0 Reporter: Arseniy Tashoyan Creating KafkaSource like this: {code:scala} val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("group.id", "group1") props.put("auto.offset.reset", "latest") val kafkaSource = KafkaSource.builder[String]() .setProperties(props) .build() {code} The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of configured *"latest"*. This occurs because _"auto.offset.reset"_ gets overridden by _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_. The default value for startingOffsetsInitializer is _"earliest"_. This behavior is misleading. This behavior imposes an inconvenience on configuring the Kafka connector. We cannot use the Kafka settings _"auto.offset.reset"_ as-is. Instead we must extract this particular setting from other settings and propagate to _KafkaSourceBuilder.setStartingOffsets()_: {code:scala} val kafkaSource = KafkaSource.builder[String]() .setProperties(props) .setStartingOffsets( OffsetsInitializer.committedOffsets( OffsetResetStrategy.valueOf( props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) .asInstanceOf[String] .toUpperCase(Locale.ROOT) ) ) ) .build() {code} The expected behavior is to use the value of _"auto.offset.reset"_ provided by _KafkaSourceBuilder.setProperties()_ - unless overridden via _KafkaSourceBuilder. setStartingOffsets()_. -- This message was sent by Atlassian Jira (v8.20.1#820001)