[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863874#comment-15863874 ] ASF GitHub Bot commented on FLINK-4616: --- Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 Ok > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863875#comment-15863875 ] ASF GitHub Bot commented on FLINK-4616: --- Github user MayerRoman closed the pull request at: https://github.com/apache/flink/pull/3031 > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863791#comment-15863791 ] Tzu-Li (Gordon) Tai commented on FLINK-4616: We decided that we should let the user function code be free of responsibility of checkpointing watermarks, and let user code simply leave that to the streaming internals (checkpointed by window operators, and perhaps also by source operators). Therefore, this would not be need in the Kafka consumer. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863789#comment-15863789 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 Thanks. Can you please close this PR then :-D ? I'll close the JIRA. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863609#comment-15863609 ] ASF GitHub Bot commented on FLINK-4616: --- Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 Hello, Tzu-Li Tai! I think you make a good decision and agree to the fact that the PR and JIRA ticket can be closed. Do not worry about the done work, I got good experience in the process. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863299#comment-15863299 ] ASF GitHub Bot commented on FLINK-4616: --- Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 I hope that I ended up with another issue, and I come back to this. First, I want to ask a question that perhaps remove all the others. Tzu-Li Tai, did I understand correctly that if discussion about letting the window operators checkpoint watermarks lead to the decision to implement this functionality in the window operators, the need to preserve the state of watermarks in Kafka consumer will disappear? > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863326#comment-15863326 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 Hi @MayerRoman! Thank you for coming back to this issue. I had a quick chat offline with @aljoscha about whether or not it'll be reasonable to add this. Either your approach in this PR or letting window operators checkpoint watermarks will both solve the issue of late elements after restore. We thought that we should let the user function code be free of responsibility of checkpointing watermarks, and let user code simply leave that to the streaming internals (checkpointed by window operators, and perhaps also by source operators). So, essentially, the Kafka consumer should not need to checkpoint watermarks, and we can close this PR and the JIRA ticket. Very sorry for the late discussion on this, and having you worked on it already. Let we know what you think and whether or not you agree :-) > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862836#comment-15862836 ] ASF GitHub Bot commented on FLINK-4616: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3031 I think this PR should also include a test for the added feature. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839419#comment-15839419 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 A re-clarification about backwards compatibility for state type change: Currently, one limitation for compatible applications across savepoint restore is that you can't change the type of state otherwise state restore will fail, therefore not compatible. The only work around, is to have another field as the new state with the new type, and somehow try to "encode" / "decode" the watermark state into / from the original `Tuple2`. I don't think this is easily possible ... At the same time, there was recent discussion about letting the window operators also checkpoint watermarks. So perhaps we might not need to let the Kafka sources checkpoint watermarks in the end, if the window operators already take care of restoring the previous event time. What I'm curious about right now is whether or not in the future, redistributions of Kafka partition states across source subtasks will work well with the checkpointed watermarks in the downstream window operators. I don't think there should be a problem. @aljoscha can you perhaps help clarify this? > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839321#comment-15839321 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935971 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java --- @@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() { return partitionWatermark; } + void setCurrentWatermarkTimestamp(long watermarkTimestamp) { --- End diff -- The other methods seem to be `public` (although they can actually be package-private). Should we stay consistent with that here? > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839317#comment-15839317 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935696 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE)); + } + + return state; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp())); + } + + return state; + } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark())); + } + + return state; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); --- End diff -- Would be good to have a reason message here. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839320#comment-15839320 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97758477 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { LOG.debug("snapshotState() called on closed source"); } else { - offsetsStateForCheckpoint.clear(); + offsetsAndWatermarksStateForCheckpoint.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the - // originally restored offsets or the assigned partitions + // originally restored offsets and watermarks or the assigned partitions - if (restoreToOffset != null) { + if (restoreToOffsetAndWatermark != null) { - for (Map.EntrykafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + for (Map.Entry > kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) { + offsetsAndWatermarksStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue())); --- End diff -- Having a specific checkpoint state object will also be helpful for code readability in situations like this one (it's quite tricky to understand quickly what the key / value refers to, as well as some of the `f0`, `f1` calls in other parts of the PR. I know the previous code used `f0` and `f1` also, but I think it's a good opportunity to improve that). > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839316#comment-15839316 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935720 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), Long.MIN_VALUE)); + } + + return state; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentWatermarkTimestamp())); + } + + return state; + } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks partition : partitions) { + state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), partition.getCurrentPartitionWatermark())); + } + + return state; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); } - return state; } /** -* Restores the partition offsets. +* Restores the partition offsets and watermarks. * -* @param snapshotState The offsets for the partitions +* @param snapshotState The offsets and watermarks for the partitions */ - public void restoreOffsets(Map snapshotState) { - for (KafkaTopicPartitionState partition : allPartitions) { - Long offset = snapshotState.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); + public void restoreOffsetsAndWatermarks(Map > snapshotState) { + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + for (KafkaTopicPartitionState partition : allPartitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { +
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839318#comment-15839318 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97757847 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -101,7 +101,7 @@ * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValuepunctuatedWatermarkAssigner; - private transient ListState > offsetsStateForCheckpoint; + private transient ListState >> offsetsAndWatermarksStateForCheckpoint; --- End diff -- I think we should switch to have a specific checkpointed state object instead of continuing to "extend" the original Tuple. This will also be helpful for compatibility for any future changes to the checkpointed state. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839319#comment-15839319 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935636 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -175,34 +176,115 @@ protected AbstractFetcher( // /** -* Takes a snapshot of the partition offsets. +* Takes a snapshot of the partition offsets and watermarks. * * Important: This method mus be called under the checkpoint lock. * -* @return A map from partition to current offset. +* @return A map from partition to current offset and watermark. */ - public HashMapsnapshotCurrentState() { + public HashMap > snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap > state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState partition : allPartitions) { --- End diff -- Excessive empty line above this line. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834056#comment-15834056 ] ASF GitHub Bot commented on FLINK-4616: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 Thank you for the contribution @MayerRoman. Just want to let you know that I've noticed this PR, and I think the issue is definitely something we want to fix. I'll allocate some time this week to review the PR. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > Fix For: 1.2.0 > > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15769536#comment-15769536 ] ASF GitHub Bot commented on FLINK-4616: --- Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 I think that the changes that I propose eliminates the possibility of starting with checkpoints created before my code changes. Because now it saves ListState>> (partition + offset + watermark). And before it saved ListState > (partition + offset). (I mean checkpoints version later then 1.1. Recently Added backward compatibility with 1.1 snapshots is taken into account in my commit with it, I think everything is ok) Please advise me how to repair backward compatibility. I have some idea of how to implement it: 1) somehow verify returned from stateStore.getSerializableListState(..) object in initializeState method https://github.com/apache/flink/pull/3031/files?diff=unified#diff-06bf4a7f73d98ef91309154654563475R321 is it ListState > or ListState >> 2) Use for saving watermark separate state-object. Or it is necessary implement different way. I would be grateful for help. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > Fix For: 1.2.0 > > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15763551#comment-15763551 ] ASF GitHub Bot commented on FLINK-4616: --- GitHub user MayerRoman opened a pull request: https://github.com/apache/flink/pull/3031 [FLINK-4616] Added functionality through which watermarks for each pa… …rtition are saved and loaded via checkpointing mechanism [FLINK-4616] Kafka consumer doesn't store last emmited watermarks per partition in state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MayerRoman/flink FLINK_4616 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3031.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3031 commit bb9a54903dd445aa4d0750b1a0d6d1d592ab891f Author: Roman MaierDate: 2016-12-20T07:28:12Z [FLINK-4616] Added functionality through which watermarks for each partition are saved and loaded via checkpointing mechanism > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno >Assignee: Roman Maier > Fix For: 1.2.0 > > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state
[ https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510313#comment-15510313 ] Aljoscha Krettek commented on FLINK-4616: - +1, I think this would be good to have. > Kafka consumer doesn't store last emmited watermarks per partition in state > --- > > Key: FLINK-4616 > URL: https://issues.apache.org/jira/browse/FLINK-4616 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1 >Reporter: Yuri Makhno > Fix For: 1.2.0, 1.1.3 > > > Kafka consumers stores in state only kafka offsets and doesn't store last > emmited watermarks, this may go to wrong state when checkpoint is restored: > Let's say our watermark is (timestamp - 10) and in case we have the following > messages queue results will be different after checkpoint restore and during > normal processing: > A(ts = 30) > B(ts = 35) > -- checkpoint goes here > C(ts=15) -- this one should be filtered by next time window > D(ts=60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)