[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694736#comment-16694736 ] ASF GitHub Bot commented on FLINK-5697: --- tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#issuecomment-440670574 @mxm thanks for the review and merge! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694552#comment-16694552 ] ASF GitHub Bot commented on FLINK-5697: --- asfgit closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 407a5a95524..3c5e3c7303f 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; @@ -78,6 +79,22 @@ * A custom assigner implementation can be set via {@link #setShardAssigner(KinesisShardAssigner)} to optimize the * hash function or use static overrides to limit skew. * + * In order for the consumer to emit watermarks, a timestamp assigner needs to be set via {@link + * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto watermark emit + * interval configured via {@link + * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}. + * + * Watermarks can only advance when all shards of a subtask continuously deliver records. To + * avoid an inactive or closed shard to block the watermark progress, the idle timeout should be + * configured via configuration property {@link + * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards won't be considered + * idle and watermark calculation will wait for newer records to arrive from all shards. + * + * Note that re-sharding of the Kinesis stream while an application (that relies on + * the Kinesis records for watermarking) is running can lead to incorrect late events. + * This depends on how shards are assigned to subtasks and applies regardless of whether watermarks + * are generated in the source or a downstream operator. + * * @param the type of data emitted */ @PublicEvolving @@ -108,6 +125,8 @@ */ private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER; + private AssignerWithPeriodicWatermarks periodicWatermarkAssigner; + // // Runtime state // @@ -213,13 +232,28 @@ public KinesisShardAssigner getShardAssigner() { /** * Provide a custom assigner to influence how shards are distributed over subtasks. -* @param shardAssigner +* @param shardAssigner shard assigner */ public void setShardAssigner(KinesisShardAssigner shardAssigner) { this.shardAssigner = checkNotNull(shardAssigner, "function can not be null"); ClosureCleaner.clean(shardAssigner, true); } + public AssignerWithPeriodicWatermarks getPeriodicWatermarkAssigner() { + return periodicWatermarkAssigner; + } + + /** +* Set the assigner that will extract the timestamp from {@link T} and calculate the +* watermark. +* @param periodicWatermarkAssigner periodic watermark assigner +*/ + public void setPeriodicWatermarkAssigner( + AssignerWithPeriodicWatermarks periodicWatermarkAssigner) { + this.periodicWatermarkAssigner = periodicWatermarkAssigner; + ClosureCleaner.clean(this.periodicWatermarkAssigner, true); + } + // // Source life cycle // @@ -414,7 +448,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693743#comment-16693743 ] ASF GitHub Bot commented on FLINK-5697: --- tweise commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r235154450 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -609,7 +667,115 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed this.numberOfActiveShards.incrementAndGet(); } - return subscribedShardsState.size() - 1; + int shardStateIndex = subscribedShardsState.size() - 1; + + // track all discovered shards for watermark determination + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + if (sws == null) { + sws = new ShardWatermarkState(); + try { + sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner); Review comment: So the watermark assigner is a serializable object, which is the only way it can be set on the client and deployed to the TM. So this is just creating the per thread instances using the same mechanism. Note that there is no provision for a factory method in the core interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693734#comment-16693734 ] ASF GitHub Bot commented on FLINK-5697: --- tweise commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r235149642 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -182,6 +189,28 @@ private volatile boolean running = true; + private final AssignerWithPeriodicWatermarks periodicWatermarkAssigner; + + /** +* The watermark related state for each shard consumer. Entries in this map will be created when shards +* are discovered. After recovery, this shard map will be recreated, possibly with different shard index keys, +* since those are transient and not part of checkpointed state. +*/ + private ConcurrentHashMap shardWatermarks = new ConcurrentHashMap<>(); Review comment: The mapping can change after recovery. It is not necessary to checkpoint this though because the watermarks are derived information. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693731#comment-16693731 ] ASF GitHub Bot commented on FLINK-5697: --- tweise commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r235149070 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -609,7 +667,115 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed this.numberOfActiveShards.incrementAndGet(); } - return subscribedShardsState.size() - 1; + int shardStateIndex = subscribedShardsState.size() - 1; + + // track all discovered shards for watermark determination + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + if (sws == null) { + sws = new ShardWatermarkState(); + try { + sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner); + } catch (Exception e) { + throw new RuntimeException(e); + } + sws.lastUpdated = getCurrentTimeMillis(); + sws.lastRecordTimestamp = Long.MIN_VALUE; + shardWatermarks.put(shardStateIndex, sws); + } + + return shardStateIndex; + } + } + + /** +* Return the current system time. Allow tests to override this to simulate progress for watermark +* logic. +* +* @return +*/ + @VisibleForTesting + protected long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + + /** +* Called periodically to emit a watermark. Checks all shards for the current event time +* watermark, and possibly emits the next watermark. +* +* Shards that have not received an update for a certain interval are considered inactive so as +* to not hold back the watermark indefinitely. When all shards are inactive, the subtask will be +* marked as temporarily idle to not block downstream operators. +*/ + @VisibleForTesting + protected void emitWatermark() { + LOG.debug( + "###evaluating watermark for subtask {} time {}", + indexOfThisConsumerSubtask, + getCurrentTimeMillis()); + long potentialWatermark = Long.MAX_VALUE; + long idleTime = + (shardIdleIntervalMillis > 0) + ? getCurrentTimeMillis() - shardIdleIntervalMillis + : Long.MAX_VALUE; + + for (Map.Entry e : shardWatermarks.entrySet()) { + // consider only active shards, or those that would advance the watermark + Watermark w = e.getValue().periodicWatermarkAssigner.getCurrentWatermark(); + if (w != null && (e.getValue().lastUpdated >= idleTime || w.getTimestamp() > lastWatermark)) { + potentialWatermark = Math.min(potentialWatermark, w.getTimestamp()); + } + } + + // advance watermark if possible (watermarks can only be ascending) + if (potentialWatermark == Long.MAX_VALUE) { Review comment: The potential watermark depends on the logic in the prior loop. The idle condition should only be executed when there is no potential watermark. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693518#comment-16693518 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r234581523 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -609,7 +667,115 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed this.numberOfActiveShards.incrementAndGet(); } - return subscribedShardsState.size() - 1; + int shardStateIndex = subscribedShardsState.size() - 1; + + // track all discovered shards for watermark determination + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + if (sws == null) { + sws = new ShardWatermarkState(); + try { + sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner); + } catch (Exception e) { + throw new RuntimeException(e); Review comment: ```suggestion throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693521#comment-16693521 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r235093375 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -182,6 +189,28 @@ private volatile boolean running = true; + private final AssignerWithPeriodicWatermarks periodicWatermarkAssigner; + + /** +* The watermark related state for each shard consumer. Entries in this map will be created when shards +* are discovered. After recovery, this shard map will be recreated, possibly with different shard index keys, +* since those are transient and not part of checkpointed state. +*/ + private ConcurrentHashMap shardWatermarks = new ConcurrentHashMap<>(); Review comment: So it is not possible to checkpoint this and use it to restore the partition state upon recovery? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693523#comment-16693523 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r234660764 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -609,7 +667,115 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed this.numberOfActiveShards.incrementAndGet(); } - return subscribedShardsState.size() - 1; + int shardStateIndex = subscribedShardsState.size() - 1; + + // track all discovered shards for watermark determination + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + if (sws == null) { + sws = new ShardWatermarkState(); + try { + sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner); Review comment: That's a bit hacky, would be nicer to have a WatermarkAssigner which has a factory method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693520#comment-16693520 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r234579413 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java ## @@ -220,6 +239,22 @@ public void setShardAssigner(KinesisShardAssigner shardAssigner) { ClosureCleaner.clean(shardAssigner, true); } + public AssignerWithPeriodicWatermarks getPeriodicWatermarkAssigner() { + return periodicWatermarkAssigner; + } + + /** +* Set the assigner that will extract the timestamp from {@link T} and calculate the +* watermark. +* +* @param periodicWatermarkAssigner Review comment: Empty parameter doc. Add info or remove. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693519#comment-16693519 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r234703941 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -609,7 +667,115 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed this.numberOfActiveShards.incrementAndGet(); } - return subscribedShardsState.size() - 1; + int shardStateIndex = subscribedShardsState.size() - 1; + + // track all discovered shards for watermark determination + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + if (sws == null) { + sws = new ShardWatermarkState(); + try { + sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner); + } catch (Exception e) { + throw new RuntimeException(e); + } + sws.lastUpdated = getCurrentTimeMillis(); + sws.lastRecordTimestamp = Long.MIN_VALUE; + shardWatermarks.put(shardStateIndex, sws); + } + + return shardStateIndex; + } + } + + /** +* Return the current system time. Allow tests to override this to simulate progress for watermark +* logic. +* +* @return Review comment: Remove or document. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693522#comment-16693522 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r235093986 ## File path: pom.xml ## @@ -124,7 +124,7 @@ under the License. 1.8.2 4.12 2.21.0 - 2.0.0-beta.5 + 2.0.0-RC.1 Review comment: `2.0.0-RC.4` is the latest :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693524#comment-16693524 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r234580470 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java ## @@ -78,6 +79,22 @@ * A custom assigner implementation can be set via {@link #setShardAssigner(KinesisShardAssigner)} to optimize the * hash function or use static overrides to limit skew. * + * In order for the consumer to emit watermarks, a timestamp assigner needs to be set via {@link + * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto watermark emit + * interval configured via {@link + * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}. + * + * Watermarks can only advance when all shards of a subtask continuously deliver records. To + * avoid an inactive or closed shard to block the watermark progress, the idle timeout should be + * configured via configuration property {@link + * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards won't be considered + * idle and watermark calculation will wait for newer records to arrive from all shards. + * + * Note that re-sharding of the Kinesis stream while an application (that relies on + * the Kinesis records for watermarking) is running can lead to incorrect late events. + * This depends on how shards are assigned to subtasks and applies regardless of whether watermarks + * are generated in the source or a downstream operator. Review comment: Good to mention this here. The re-sharding logic can corrupt the watermarking logic, but that is not unique to this change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693517#comment-16693517 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r234659839 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -609,7 +667,115 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed this.numberOfActiveShards.incrementAndGet(); } - return subscribedShardsState.size() - 1; + int shardStateIndex = subscribedShardsState.size() - 1; + + // track all discovered shards for watermark determination + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + if (sws == null) { + sws = new ShardWatermarkState(); + try { + sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner); + } catch (Exception e) { + throw new RuntimeException(e); + } + sws.lastUpdated = getCurrentTimeMillis(); + sws.lastRecordTimestamp = Long.MIN_VALUE; + shardWatermarks.put(shardStateIndex, sws); + } + + return shardStateIndex; + } + } + + /** +* Return the current system time. Allow tests to override this to simulate progress for watermark +* logic. +* +* @return +*/ + @VisibleForTesting + protected long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + + /** +* Called periodically to emit a watermark. Checks all shards for the current event time +* watermark, and possibly emits the next watermark. +* +* Shards that have not received an update for a certain interval are considered inactive so as +* to not hold back the watermark indefinitely. When all shards are inactive, the subtask will be +* marked as temporarily idle to not block downstream operators. +*/ + @VisibleForTesting + protected void emitWatermark() { + LOG.debug( + "###evaluating watermark for subtask {} time {}", Review comment: Remove the `###`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693525#comment-16693525 ] ASF GitHub Bot commented on FLINK-5697: --- mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#discussion_r234705383 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ## @@ -609,7 +667,115 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed this.numberOfActiveShards.incrementAndGet(); } - return subscribedShardsState.size() - 1; + int shardStateIndex = subscribedShardsState.size() - 1; + + // track all discovered shards for watermark determination + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + if (sws == null) { + sws = new ShardWatermarkState(); + try { + sws.periodicWatermarkAssigner = InstantiationUtil.clone(periodicWatermarkAssigner); + } catch (Exception e) { + throw new RuntimeException(e); + } + sws.lastUpdated = getCurrentTimeMillis(); + sws.lastRecordTimestamp = Long.MIN_VALUE; + shardWatermarks.put(shardStateIndex, sws); + } + + return shardStateIndex; + } + } + + /** +* Return the current system time. Allow tests to override this to simulate progress for watermark +* logic. +* +* @return +*/ + @VisibleForTesting + protected long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + + /** +* Called periodically to emit a watermark. Checks all shards for the current event time +* watermark, and possibly emits the next watermark. +* +* Shards that have not received an update for a certain interval are considered inactive so as +* to not hold back the watermark indefinitely. When all shards are inactive, the subtask will be +* marked as temporarily idle to not block downstream operators. +*/ + @VisibleForTesting + protected void emitWatermark() { + LOG.debug( + "###evaluating watermark for subtask {} time {}", + indexOfThisConsumerSubtask, + getCurrentTimeMillis()); + long potentialWatermark = Long.MAX_VALUE; + long idleTime = + (shardIdleIntervalMillis > 0) + ? getCurrentTimeMillis() - shardIdleIntervalMillis + : Long.MAX_VALUE; + + for (Map.Entry e : shardWatermarks.entrySet()) { + // consider only active shards, or those that would advance the watermark + Watermark w = e.getValue().periodicWatermarkAssigner.getCurrentWatermark(); + if (w != null && (e.getValue().lastUpdated >= idleTime || w.getTimestamp() > lastWatermark)) { + potentialWatermark = Math.min(potentialWatermark, w.getTimestamp()); + } + } + + // advance watermark if possible (watermarks can only be ascending) + if (potentialWatermark == Long.MAX_VALUE) { Review comment: Will this not be executed repeatedly? Maybe move this block inside the `potentialWatermark > lastWatermark`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676177#comment-16676177 ] ASF GitHub Bot commented on FLINK-5697: --- tzulitai commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#issuecomment-436137288 @tweise will put this on my backlog and try to get to this as soon as I'm finished with 1.7 remaining work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676173#comment-16676173 ] ASF GitHub Bot commented on FLINK-5697: --- tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#issuecomment-436137039 @tzulitai @aljoscha PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675628#comment-16675628 ] ASF GitHub Bot commented on FLINK-5697: --- jgrier commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#issuecomment-435998278 Looks good @tweise. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672295#comment-16672295 ] ASF GitHub Bot commented on FLINK-5697: --- tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#issuecomment-435206371 @EronWright that's correct and I will make sure to document this. Even our planned follow-up work won't be able to address such resharding scenario. I think we will only be able to address that with the new source design that is currently under discussion (which should permit centralized discovery and more sophisticated splitting/shard distribution). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672285#comment-16672285 ] ASF GitHub Bot commented on FLINK-5697: --- EronWright commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#issuecomment-435202809 There is a caveat with this implementation that the docs should perhaps mention. The caveat is that it may produce spurious late events when processing a backlog of data. Here's an example of when that may occur. Imagine that subtask 1 is processing shard A and subtask 2 is processing shard B. Shard A has reached 6:00 in event time (as per the assigner), and so the subtask emits the corresponding watermark. At this point, the subtask has made the irrevocable assertion that subsequent events will be past 6:00. Meanwhile, Shard B is at 5:30 and undergoes a split into C/D. If either shard is subsequently assigned to subtask 1, the events will be considered late due to the assertion made earlier. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672248#comment-16672248 ] ASF GitHub Bot commented on FLINK-5697: --- tweise closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 407a5a95524..f0852584ade 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; @@ -78,6 +79,17 @@ * A custom assigner implementation can be set via {@link #setShardAssigner(KinesisShardAssigner)} to optimize the * hash function or use static overrides to limit skew. * + * In order for the consumer to emit watermarks, a timestamp assigner needs to be set via {@link + * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto watermark emit + * interval configured via {@link + * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}. + * + * Watermarks can only advance when all shards of a subtask continuously deliver records. To + * avoid an inactive or closed shard to block the watermark progress, the idle timeout should be + * configured via configuration property {@link + * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards won't be considered + * idle and watermark calculation will wait for newer records to arrive from all shards. + * * @param the type of data emitted */ @PublicEvolving @@ -108,6 +120,8 @@ */ private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER; + private AssignerWithPeriodicWatermarks periodicWatermarkAssigner; + // // Runtime state // @@ -220,6 +234,22 @@ public void setShardAssigner(KinesisShardAssigner shardAssigner) { ClosureCleaner.clean(shardAssigner, true); } + public AssignerWithPeriodicWatermarks getPeriodicWatermarkAssigner() { + return periodicWatermarkAssigner; + } + + /** +* Set the assigner that will extract the timestamp from {@link T} and calculate the +* watermark. +* +* @param periodicWatermarkAssigner +*/ + public void setPeriodicWatermarkAssigner( + AssignerWithPeriodicWatermarks periodicWatermarkAssigner) { + this.periodicWatermarkAssigner = periodicWatermarkAssigner; + ClosureCleaner.clean(this.periodicWatermarkAssigner, true); + } + // // Source life cycle // @@ -414,7 +444,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { Properties configProps, KinesisDeserializationSchema deserializationSchema) { - return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner); + return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner); } @VisibleForTesting diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672249#comment-16672249 ] ASF GitHub Bot commented on FLINK-5697: --- tweise opened a new pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980 ## What is the purpose of the change Adds support for periodic per-shard watermarks to the Kinesis consumer. This functionality is off by default and can be enabled by setting an optional watermark assigner on the consumer. When enabled, the watermarking also optionally supports idle shard detection based on configurable interval of inactivity. ## Brief change log * Add watermark assigner to consumer * Modify data fetcher to track watermark state per shard * Modify emitRecordAndUpdateState to extract timestamp and update watermark * Timer driven periodic watermark emit ## Verifying this change This change added tests and can be verified as follows: Added a unit test and planning to add more test coverage with subsequent work for shared watermark state and emit queue as discussed on ML. This change is ported from Lyft internal codebase that is used in production. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / *no* / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671151#comment-16671151 ] ASF GitHub Bot commented on FLINK-5697: --- tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980#issuecomment-434940116 R: @jgrier @aljoscha @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671149#comment-16671149 ] ASF GitHub Bot commented on FLINK-5697: --- tweise opened a new pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard watermark support URL: https://github.com/apache/flink/pull/6980 ## What is the purpose of the change Adds support for periodic per-shard watermarks to the Kinesis consumer. This functionality is off by default and can be enabled by setting an optional watermark assigner on the consumer. When enabled, the watermarking also optionally supports idle shard detection based on configurable interval of inactivity. ## Brief change log * Add watermark assigner to consumer * Modify data fetcher to track watermark state per shard * Modify emitRecordAndUpdateState to extract timestamp and update watermark * Timer driven periodic watermark emit ## Verifying this change This change added tests and can be verified as follows: Added a unit test and planning to add more test coverage with subsequent work for shared watermark state and emit queue as discussed on ML. This change is ported from Lyft internal codebase that is used in production. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / *no* / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669566#comment-16669566 ] Thomas Weise commented on FLINK-5697: - The latest related discussion: [https://lists.apache.org/thread.html/2341ab54f5c9a4f43f123605d6b17e374c19587bfd72c22748e08652@%3Cdev.flink.apache.org%3E] I'm planning to first bring over the source watermarking that we implemented a while ago at Lyft to the Flink Kinesis consumer. Following that, we are planning to contribute the emit queue pattern as described in Jamie's document and the state sharing mechanism. Those pieces will allow to back pressure the shard consumer threads based on the shared watermark information. > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Thomas Weise >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390365#comment-16390365 ] Thomas Weise commented on FLINK-5697: - For idleness detection see: [https://github.com/apache/flink/pull/5634] > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379815#comment-16379815 ] Thomas Weise commented on FLINK-5697: - We have implemented periodic watermark support in a customization of FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance of AssignerWithPeriodicWatermarks and uses the configuration from ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also addresses the issue described in FLINK-5479 with an (optional) interval property for the user to specify after how much time since the last a shard is considered idle and should not hold back the watermark. If there is interest, I would contribute these changes to the current Flink Kinesis connector. > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357640#comment-16357640 ] Thomas Weise commented on FLINK-5697: - Since shards are immutable wrt their hash key range and records cannot move between shards, we should be able to use the parent shard IDs and the last read sequence to find when a newly discovered shard can be read from. Child shards don't need to be assigned to the same subtask, in which case we would need a way to know the last read offset from the parent shard from a different subtask for comparison with EndingSequenceNumber. Is it possible to retrieve the last checkpointed offsets from other subtasks outside of restore to perform such check? (It would still imply that consumption from a new child shard cannot start until the parent was checkpointed and therefore add latency, but would provide the ordering guarantee we are looking for?) > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970738#comment-15970738 ] Tzu-Li (Gordon) Tai commented on FLINK-5697: I'd say it still makes sense. If I understand how Kinesis resharding works correctly, merging or splitting shards always result in new empty shards with new key ranges, and the previous shards that were merged or split are simple closed (i.e., records that were collected by Kinesis before the resharding are consumed from the old closed shards). Therefore, we can still assume simple event time patterns per-shard, as there is no merging or splitting of the Kinesis records taking place. > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970149#comment-15970149 ] Eron Wright commented on FLINK-5697: - My understanding is that per-partition watermarks make sense due to ordering guarantees and the assumption of strictly-ascending timestamps per partition in some Kafka app architectures. Given how shards are dynamic in Kinesis, does this functionality make sense here? > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.3.15#6346)