[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-23 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778783#comment-17778783
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-28303:
-

Merged.

apache/flink-connector-kafka:main - 54e3b70deb349538edba1ec2b051ed9d9f79b563
apache/flink-connector-kafka:v3.0 538e9c10463dbdf0942c8858678e98bf3522d566

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Assignee: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-04 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772065#comment-17772065
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-28303:
-

Making this a blocker as it's confirmed to be a real issue data loss issue with 
the newer {{{}KafkaSource{}}}.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Assignee: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-09-26 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769056#comment-17769056
 ] 

tanjialiang commented on FLINK-28303:
-

Maybe we should implement the LatestOffsetsInitializer to look up the end 
offset and pass it to the reader, instead of pass 
KafkaPartitionSplit#LATEST_OFFSET(-1).

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-09-25 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769014#comment-17769014
 ] 

tanjialiang commented on FLINK-28303:
-

[~martijnvisser] I had already check the latest kafka connector code, this 
problem still exists.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-09-25 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768680#comment-17768680
 ] 

Martijn Visser commented on FLINK-28303:


[~tanjialiang] It would be good to double check this in the latest Kafka 
connector code, perhaps it's already addressed since the work was done on 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-09-25 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768637#comment-17768637
 ] 

tanjialiang commented on FLINK-28303:
-

Maybe this this is the reason? 
[FLINK-33153|https://issues.apache.org/jira/browse/FLINK-33153]

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-07-04 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562176#comment-17562176
 ] 

Benchao Li commented on FLINK-28303:


[~martijnvisser] Yes, it's a quite old version. I just want to remind the one 
who will take this issue to take this into consideration if possible.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-07-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562056#comment-17562056
 ] 

Martijn Visser commented on FLINK-28303:


[~libenchao] There have been quite some changes between 1.11 and the current 
versions (including a migration from FlinkKafkaSource to KafkaSource and a 
Kafka version bump). 

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-07-01 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17561652#comment-17561652
 ] 

Benchao Li commented on FLINK-28303:


Besides this case, I would like to mention that when the Kafka Cluster is 
unhealthy, e.g. some partitions are in under replica status, the problem also 
arises.
In our internal use cases, we suffers this a lot, there will be two cases:
1. if we do not enable cp/sp, when Flink job starts with 'group-offsets', we 
will omit the 'under replica' partition. when the partition recovers, we'll 
treat it as a new added partition, and we'll consume from earliest. This will 
lead to repeated consuming.
2. if we enabled cp/sp, we'll use the offsets stored in state, however, it's 
not consumable for now. When the partition recovers, we'll again add it as a 
new partition, and consume this partition twice. This will also lead to 
repeated consuming.

PS: We are using 1.11 now, I didn't checked the master's code whether this 
still exist.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-06-30 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17561037#comment-17561037
 ] 

Robert Metzger commented on FLINK-28303:


[~tzulitai] Thanks a lot for your quick response. We haven't tried setting the 
"auto.offset.reset" property yet (and it seems not possible in 1.14 according 
to [~tanjialiang] comment). We actually experienced this issue in an end to end 
test (that's why we have such low data volumes, actually triggering the issue). 
I will look into the E2E test again soon.
I had the same idea fixing the issue as you are proposing. Let's see what 
[~renqs]'s take on the issue is.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-06-30 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560965#comment-17560965
 ] 

tanjialiang commented on FLINK-28303:
-

I found this bug in flink-1.14.2 too. When some partition is empty, this bug 
will be appear.

[~tzulitai] And I'm sorry to tell you the "auto.offset.reset" is invalid in 
1.14, because of this bug:https://issues.apache.org/jira/browse/FLINK-24697 

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-06-30 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560918#comment-17560918
 ] 

Martijn Visser commented on FLINK-28303:


[~renqs] WDYT?

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-06-29 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560595#comment-17560595
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-28303:
-

[~rmetzger] have you tried setting the "auto.offset.reset" property to 
"earliest"? The default of that value is "latest".
That config dictates the position to start from for a partition when 1) no 
records have been consumed yet from a partition, and 2) when an attempted 
offset to read from is out of bounds.

If the records are present after changing that config to "earliest", then it 
should prove your theory.
In that case, we should fix the Kafka connector so that as soon as a partition 
is discovered, we should already write its offset into state, probably with a 
special "earliest" value/marker of some sort.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-06-29 Thread Sharon Xie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560591#comment-17560591
 ] 

Sharon Xie commented on FLINK-28303:


I posted this with logs in this thread: 
https://lists.apache.org/thread/wbppox04rrsqsb1m0wy2nrgwzjsrtf24

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)