Hello, I have a job running with Flink 1.15.0 that consumes from Kafka with the new KafkaSource API, setting a group ID explicitly and specifying OffsetsInitializer.earliest() as a starting offset. Today I restarted the job ignoring both savepoint and checkpoint, and the consumer started reading from the first available message in the broker (from 24 hours ago), i.e. it completely ignored the offsets that were committed to Kafka. If I use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST) instead, the problem seems to go away.
With the previous FlinkKafkaConsumer, using earliest didn't cause any such issues. Was this changed in the aforementioned way on purpose? Regards, Alexis.