Hi Sharon, 

Could you check the log after starting the job with savepoint? If you have INFO 
log enabled you will get an entry “Consumer subtask {} will start reading {} 
partitions with offsets in restored state: {}” [1] in the log, which shows the 
starting offset of partitions. This might be helpful to reveal the problem.

BTW FlinkKafkaConsumer has been marked as deprecated since 1.14. Please 
consider switching to the new KafkaSource if you are developing new 
applications. 

[1] 
https://github.com/apache/flink/blob/a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L614-L618

Best regards, 

Qingsheng

> On Mar 18, 2022, at 13:28, Sharon Xie <sharon.xie...@gmail.com> wrote:
> 
> Hi, 
> 
> I'm seeing an odd behavior for Kafka source where some records are dropped 
> during recovery. 
> 
> My test set up is: Kafka source topic -> pass through flink job -> Kafka sink 
> topic
> There are 10 partitions in the source & sink topics.
> 
> Test Steps
> * Start the flink job, send 5 records (first batch) to the source topic, and 
> read the sink. I see all 5 records without issue.
> * Stop the job with a savepoint
> * Send another 10 records (second batch) to the source topic
> * Start the job with the savepoint
> 
> Expect: read from the beginning of the sink topic, I should see all 15 
> records from the first and second batches.
> Actual: Some random records in the second batches are missing.
> 
> My guess is that the savepoint only contains offsets with partitions that 
> received records from the first batch. Other partitions didn't have a state 
> and by default read from the `latest` offset during recovery. So records from 
> the second batch that fell into the previously empty partitions are never 
> processed. 
> 
> However, based on the source code, I'd expect the partitions without records 
> from the 1st batch to be initialized with `earliest-offset`. But this is not 
> the behavior I saw. What do I miss?
> 
> I'm using Flink 1.14.3. May I know  if there is anything I missed? If not, 
> what's the reason for such behavior? Otherwise, is this a bug?
> 
> 
> 
> Thanks,
> Sharon

Reply via email to