[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694736#comment-16694736
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980#issuecomment-440670574
 
 
   @mxm thanks for the review and merge!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694552#comment-16694552
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

asfgit closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 407a5a95524..3c5e3c7303f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
@@ -78,6 +79,22 @@
  * A custom assigner implementation can be set via {@link 
#setShardAssigner(KinesisShardAssigner)} to optimize the
  * hash function or use static overrides to limit skew.
  *
+ * In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto 
watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * Watermarks can only advance when all shards of a subtask continuously 
deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle 
timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards 
won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from 
all shards.
+ *
+ * Note that re-sharding of the Kinesis stream while an application (that 
relies on
+ * the Kinesis records for watermarking) is running can lead to incorrect late 
events.
+ * This depends on how shards are assigned to subtasks and applies regardless 
of whether watermarks
+ * are generated in the source or a downstream operator.
+ *
  * @param  the type of data emitted
  */
 @PublicEvolving
@@ -108,6 +125,8 @@
 */
private KinesisShardAssigner shardAssigner = 
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
+   private AssignerWithPeriodicWatermarks periodicWatermarkAssigner;
+
// 

//  Runtime state
// 

@@ -213,13 +232,28 @@ public KinesisShardAssigner getShardAssigner() {
 
/**
 * Provide a custom assigner to influence how shards are distributed 
over subtasks.
-* @param shardAssigner
+* @param shardAssigner shard assigner
 */
public void setShardAssigner(KinesisShardAssigner shardAssigner) {
this.shardAssigner = checkNotNull(shardAssigner, "function can 
not be null");
ClosureCleaner.clean(shardAssigner, true);
}
 
+   public AssignerWithPeriodicWatermarks getPeriodicWatermarkAssigner() 
{
+   return periodicWatermarkAssigner;
+   }
+
+   /**
+* Set the assigner that will extract the timestamp from {@link T} and 
calculate the
+* watermark.
+* @param periodicWatermarkAssigner periodic watermark assigner
+*/
+   public void setPeriodicWatermarkAssigner(
+   AssignerWithPeriodicWatermarks periodicWatermarkAssigner) {
+   this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+   ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
+   }
+
// 

//  Source life cycle
// 

@@ -414,7 +448,7 @@ public void snapshotState(FunctionSnapshotContext context) 
throws Exception {

[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693743#comment-16693743
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r235154450
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -609,7 +667,115 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
 
-   return subscribedShardsState.size() - 1;
+   int shardStateIndex = subscribedShardsState.size() - 1;
+
+   // track all discovered shards for watermark 
determination
+   ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+   if (sws == null) {
+   sws = new ShardWatermarkState();
+   try {
+   sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
 
 Review comment:
   So the watermark assigner is a serializable object, which is the only way it 
can be set on the client and deployed to the TM. So this is just creating the 
per thread instances using the same mechanism. Note that there is no provision 
for a factory method in the core interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693734#comment-16693734
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r235149642
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -182,6 +189,28 @@
 
private volatile boolean running = true;
 
+   private final AssignerWithPeriodicWatermarks 
periodicWatermarkAssigner;
+
+   /**
+* The watermark related state for each shard consumer. Entries in this 
map will be created when shards
+* are discovered. After recovery, this shard map will be recreated, 
possibly with different shard index keys,
+* since those are transient and not part of checkpointed state.
+*/
+   private ConcurrentHashMap shardWatermarks 
= new ConcurrentHashMap<>();
 
 Review comment:
   The mapping can change after recovery. It is not necessary to checkpoint 
this though because the watermarks are derived information.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693731#comment-16693731
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r235149070
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -609,7 +667,115 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
 
-   return subscribedShardsState.size() - 1;
+   int shardStateIndex = subscribedShardsState.size() - 1;
+
+   // track all discovered shards for watermark 
determination
+   ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+   if (sws == null) {
+   sws = new ShardWatermarkState();
+   try {
+   sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   sws.lastUpdated = getCurrentTimeMillis();
+   sws.lastRecordTimestamp = Long.MIN_VALUE;
+   shardWatermarks.put(shardStateIndex, sws);
+   }
+
+   return shardStateIndex;
+   }
+   }
+
+   /**
+* Return the current system time. Allow tests to override this to 
simulate progress for watermark
+* logic.
+*
+* @return
+*/
+   @VisibleForTesting
+   protected long getCurrentTimeMillis() {
+   return System.currentTimeMillis();
+   }
+
+   /**
+* Called periodically to emit a watermark. Checks all shards for the 
current event time
+* watermark, and possibly emits the next watermark.
+*
+* Shards that have not received an update for a certain interval 
are considered inactive so as
+* to not hold back the watermark indefinitely. When all shards are 
inactive, the subtask will be
+* marked as temporarily idle to not block downstream operators.
+*/
+   @VisibleForTesting
+   protected void emitWatermark() {
+   LOG.debug(
+   "###evaluating watermark for subtask {} time {}",
+   indexOfThisConsumerSubtask,
+   getCurrentTimeMillis());
+   long potentialWatermark = Long.MAX_VALUE;
+   long idleTime =
+   (shardIdleIntervalMillis > 0)
+   ? getCurrentTimeMillis() - 
shardIdleIntervalMillis
+   : Long.MAX_VALUE;
+
+   for (Map.Entry e : 
shardWatermarks.entrySet()) {
+   // consider only active shards, or those that would 
advance the watermark
+   Watermark w = 
e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
+   if (w != null && (e.getValue().lastUpdated >= idleTime 
|| w.getTimestamp() > lastWatermark)) {
+   potentialWatermark = 
Math.min(potentialWatermark, w.getTimestamp());
+   }
+   }
+
+   // advance watermark if possible (watermarks can only be 
ascending)
+   if (potentialWatermark == Long.MAX_VALUE) {
 
 Review comment:
   The potential watermark depends on the logic in the prior loop. The idle 
condition should only be executed when there is no potential watermark.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka 

[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693518#comment-16693518
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r234581523
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -609,7 +667,115 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
 
-   return subscribedShardsState.size() - 1;
+   int shardStateIndex = subscribedShardsState.size() - 1;
+
+   // track all discovered shards for watermark 
determination
+   ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+   if (sws == null) {
+   sws = new ShardWatermarkState();
+   try {
+   sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
 
 Review comment:
   ```suggestion
throw new RuntimeException("Failed to 
instantiate new WatermarkAssigner", e);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693521#comment-16693521
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r235093375
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -182,6 +189,28 @@
 
private volatile boolean running = true;
 
+   private final AssignerWithPeriodicWatermarks 
periodicWatermarkAssigner;
+
+   /**
+* The watermark related state for each shard consumer. Entries in this 
map will be created when shards
+* are discovered. After recovery, this shard map will be recreated, 
possibly with different shard index keys,
+* since those are transient and not part of checkpointed state.
+*/
+   private ConcurrentHashMap shardWatermarks 
= new ConcurrentHashMap<>();
 
 Review comment:
   So it is not possible to checkpoint this and use it to restore the partition 
state upon recovery?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693523#comment-16693523
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r234660764
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -609,7 +667,115 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
 
-   return subscribedShardsState.size() - 1;
+   int shardStateIndex = subscribedShardsState.size() - 1;
+
+   // track all discovered shards for watermark 
determination
+   ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+   if (sws == null) {
+   sws = new ShardWatermarkState();
+   try {
+   sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
 
 Review comment:
   That's a bit hacky, would be nicer to have a WatermarkAssigner which has a 
factory method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693520#comment-16693520
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r234579413
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ##
 @@ -220,6 +239,22 @@ public void setShardAssigner(KinesisShardAssigner 
shardAssigner) {
ClosureCleaner.clean(shardAssigner, true);
}
 
+   public AssignerWithPeriodicWatermarks getPeriodicWatermarkAssigner() 
{
+   return periodicWatermarkAssigner;
+   }
+
+   /**
+* Set the assigner that will extract the timestamp from {@link T} and 
calculate the
+* watermark.
+*
+* @param periodicWatermarkAssigner
 
 Review comment:
   Empty parameter doc. Add info or remove.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693519#comment-16693519
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r234703941
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -609,7 +667,115 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
 
-   return subscribedShardsState.size() - 1;
+   int shardStateIndex = subscribedShardsState.size() - 1;
+
+   // track all discovered shards for watermark 
determination
+   ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+   if (sws == null) {
+   sws = new ShardWatermarkState();
+   try {
+   sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   sws.lastUpdated = getCurrentTimeMillis();
+   sws.lastRecordTimestamp = Long.MIN_VALUE;
+   shardWatermarks.put(shardStateIndex, sws);
+   }
+
+   return shardStateIndex;
+   }
+   }
+
+   /**
+* Return the current system time. Allow tests to override this to 
simulate progress for watermark
+* logic.
+*
+* @return
 
 Review comment:
   Remove or document.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693522#comment-16693522
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r235093986
 
 

 ##
 File path: pom.xml
 ##
 @@ -124,7 +124,7 @@ under the License.
1.8.2
4.12
2.21.0
-   2.0.0-beta.5
+   2.0.0-RC.1
 
 Review comment:
   `2.0.0-RC.4` is the latest :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693524#comment-16693524
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r234580470
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ##
 @@ -78,6 +79,22 @@
  * A custom assigner implementation can be set via {@link 
#setShardAssigner(KinesisShardAssigner)} to optimize the
  * hash function or use static overrides to limit skew.
  *
+ * In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto 
watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * Watermarks can only advance when all shards of a subtask continuously 
deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle 
timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards 
won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from 
all shards.
+ *
+ * Note that re-sharding of the Kinesis stream while an application (that 
relies on
+ * the Kinesis records for watermarking) is running can lead to incorrect late 
events.
+ * This depends on how shards are assigned to subtasks and applies regardless 
of whether watermarks
+ * are generated in the source or a downstream operator.
 
 Review comment:
   Good to mention this here. The re-sharding logic can corrupt the 
watermarking logic, but that is not unique to this change. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693517#comment-16693517
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r234659839
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -609,7 +667,115 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
 
-   return subscribedShardsState.size() - 1;
+   int shardStateIndex = subscribedShardsState.size() - 1;
+
+   // track all discovered shards for watermark 
determination
+   ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+   if (sws == null) {
+   sws = new ShardWatermarkState();
+   try {
+   sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   sws.lastUpdated = getCurrentTimeMillis();
+   sws.lastRecordTimestamp = Long.MIN_VALUE;
+   shardWatermarks.put(shardStateIndex, sws);
+   }
+
+   return shardStateIndex;
+   }
+   }
+
+   /**
+* Return the current system time. Allow tests to override this to 
simulate progress for watermark
+* logic.
+*
+* @return
+*/
+   @VisibleForTesting
+   protected long getCurrentTimeMillis() {
+   return System.currentTimeMillis();
+   }
+
+   /**
+* Called periodically to emit a watermark. Checks all shards for the 
current event time
+* watermark, and possibly emits the next watermark.
+*
+* Shards that have not received an update for a certain interval 
are considered inactive so as
+* to not hold back the watermark indefinitely. When all shards are 
inactive, the subtask will be
+* marked as temporarily idle to not block downstream operators.
+*/
+   @VisibleForTesting
+   protected void emitWatermark() {
+   LOG.debug(
+   "###evaluating watermark for subtask {} time {}",
 
 Review comment:
   Remove the `###`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693525#comment-16693525
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

mxm commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add 
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r234705383
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ##
 @@ -609,7 +667,115 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
 
-   return subscribedShardsState.size() - 1;
+   int shardStateIndex = subscribedShardsState.size() - 1;
+
+   // track all discovered shards for watermark 
determination
+   ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+   if (sws == null) {
+   sws = new ShardWatermarkState();
+   try {
+   sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   sws.lastUpdated = getCurrentTimeMillis();
+   sws.lastRecordTimestamp = Long.MIN_VALUE;
+   shardWatermarks.put(shardStateIndex, sws);
+   }
+
+   return shardStateIndex;
+   }
+   }
+
+   /**
+* Return the current system time. Allow tests to override this to 
simulate progress for watermark
+* logic.
+*
+* @return
+*/
+   @VisibleForTesting
+   protected long getCurrentTimeMillis() {
+   return System.currentTimeMillis();
+   }
+
+   /**
+* Called periodically to emit a watermark. Checks all shards for the 
current event time
+* watermark, and possibly emits the next watermark.
+*
+* Shards that have not received an update for a certain interval 
are considered inactive so as
+* to not hold back the watermark indefinitely. When all shards are 
inactive, the subtask will be
+* marked as temporarily idle to not block downstream operators.
+*/
+   @VisibleForTesting
+   protected void emitWatermark() {
+   LOG.debug(
+   "###evaluating watermark for subtask {} time {}",
+   indexOfThisConsumerSubtask,
+   getCurrentTimeMillis());
+   long potentialWatermark = Long.MAX_VALUE;
+   long idleTime =
+   (shardIdleIntervalMillis > 0)
+   ? getCurrentTimeMillis() - 
shardIdleIntervalMillis
+   : Long.MAX_VALUE;
+
+   for (Map.Entry e : 
shardWatermarks.entrySet()) {
+   // consider only active shards, or those that would 
advance the watermark
+   Watermark w = 
e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
+   if (w != null && (e.getValue().lastUpdated >= idleTime 
|| w.getTimestamp() > lastWatermark)) {
+   potentialWatermark = 
Math.min(potentialWatermark, w.getTimestamp());
+   }
+   }
+
+   // advance watermark if possible (watermarks can only be 
ascending)
+   if (potentialWatermark == Long.MAX_VALUE) {
 
 Review comment:
   Will this not be executed repeatedly? Maybe move this block inside the 
`potentialWatermark > lastWatermark`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar 

[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676177#comment-16676177
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tzulitai commented on issue #6980: [FLINK-5697] [kinesis] Add periodic 
per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#issuecomment-436137288
 
 
   @tweise will put this on my backlog and try to get to this as soon as I'm 
finished with 1.7 remaining work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676173#comment-16676173
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980#issuecomment-436137039
 
 
   @tzulitai @aljoscha PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675628#comment-16675628
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

jgrier commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980#issuecomment-435998278
 
 
   Looks good @tweise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672295#comment-16672295
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980#issuecomment-435206371
 
 
   @EronWright that's correct and I will make sure to document this. Even our 
planned follow-up work won't be able to address such resharding scenario. I 
think we will only be able to address that with the new source design that is 
currently under discussion (which should permit centralized discovery and more 
sophisticated splitting/shard distribution).  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672285#comment-16672285
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

EronWright commented on issue #6980: [FLINK-5697] [kinesis] Add periodic 
per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#issuecomment-435202809
 
 
   There is a caveat with this implementation that the docs should perhaps 
mention.  The caveat is that it may produce spurious late events when 
processing a backlog of data.
   
   Here's an example of when that may occur.  Imagine that subtask 1 is 
processing shard A and subtask 2 is processing shard B.  Shard A has reached 
6:00 in event time (as per the assigner), and so the subtask emits the 
corresponding watermark.  At this point, the subtask has made the irrevocable 
assertion that subsequent events will be past 6:00.   Meanwhile, Shard B is at 
5:30 and undergoes a split into C/D.  If either shard is subsequently assigned 
to subtask 1, the events will be considered late due to the assertion made 
earlier.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672248#comment-16672248
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 407a5a95524..f0852584ade 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
@@ -78,6 +79,17 @@
  * A custom assigner implementation can be set via {@link 
#setShardAssigner(KinesisShardAssigner)} to optimize the
  * hash function or use static overrides to limit skew.
  *
+ * In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto 
watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * Watermarks can only advance when all shards of a subtask continuously 
deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle 
timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards 
won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from 
all shards.
+ *
  * @param  the type of data emitted
  */
 @PublicEvolving
@@ -108,6 +120,8 @@
 */
private KinesisShardAssigner shardAssigner = 
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
+   private AssignerWithPeriodicWatermarks periodicWatermarkAssigner;
+
// 

//  Runtime state
// 

@@ -220,6 +234,22 @@ public void setShardAssigner(KinesisShardAssigner 
shardAssigner) {
ClosureCleaner.clean(shardAssigner, true);
}
 
+   public AssignerWithPeriodicWatermarks getPeriodicWatermarkAssigner() 
{
+   return periodicWatermarkAssigner;
+   }
+
+   /**
+* Set the assigner that will extract the timestamp from {@link T} and 
calculate the
+* watermark.
+*
+* @param periodicWatermarkAssigner
+*/
+   public void setPeriodicWatermarkAssigner(
+   AssignerWithPeriodicWatermarks periodicWatermarkAssigner) {
+   this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+   ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
+   }
+
// 

//  Source life cycle
// 

@@ -414,7 +444,7 @@ public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
Properties configProps,
KinesisDeserializationSchema deserializationSchema) {
 
-   return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner);
+   return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner);
}
 
@VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 

[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672249#comment-16672249
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise opened a new pull request #6980: [FLINK-5697] [kinesis] Add periodic 
per-shard watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   
   
   ## What is the purpose of the change
   
   Adds support for periodic per-shard watermarks to the Kinesis consumer. This 
functionality is off by default and can be enabled by setting an optional 
watermark assigner on the consumer. When enabled, the watermarking also 
optionally supports idle shard detection based on configurable interval of 
inactivity.
   
   ## Brief change log
   
   * Add watermark assigner to consumer
   * Modify data fetcher to track watermark state per shard
   * Modify emitRecordAndUpdateState to extract timestamp and update watermark
   * Timer driven periodic watermark emit
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   Added a unit test and planning to add more test coverage with subsequent 
work for shared watermark state and emit queue as discussed on ML. This change 
is ported from Lyft internal codebase that is used in production. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / *no* / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671151#comment-16671151
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise commented on issue #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980#issuecomment-434940116
 
 
   R:  @jgrier @aljoscha @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671149#comment-16671149
 ] 

ASF GitHub Bot commented on FLINK-5697:
---

tweise opened a new pull request #6980: [FLINK-5697] [kinesis] Add periodic 
per-shard watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   
   
   ## What is the purpose of the change
   
   Adds support for periodic per-shard watermarks to the Kinesis consumer. This 
functionality is off by default and can be enabled by setting an optional 
watermark assigner on the consumer. When enabled, the watermarking also 
optionally supports idle shard detection based on configurable interval of 
inactivity.
   
   ## Brief change log
   
   * Add watermark assigner to consumer
   * Modify data fetcher to track watermark state per shard
   * Modify emitRecordAndUpdateState to extract timestamp and update watermark
   * Timer driven periodic watermark emit
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   Added a unit test and planning to add more test coverage with subsequent 
work for shared watermark state and emit queue as discussed on ML. This change 
is ported from Lyft internal codebase that is used in production. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / *no* / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-10-30 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669566#comment-16669566
 ] 

Thomas Weise commented on FLINK-5697:
-

The latest related discussion: 
[https://lists.apache.org/thread.html/2341ab54f5c9a4f43f123605d6b17e374c19587bfd72c22748e08652@%3Cdev.flink.apache.org%3E]

I'm planning to first bring over the source watermarking that we implemented a 
while ago at Lyft to the Flink Kinesis consumer. Following that, we are 
planning to contribute the emit queue pattern as described in Jamie's document 
and the state sharing mechanism. Those pieces will allow to back pressure the 
shard consumer threads based on the shared watermark information. 

 

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Thomas Weise
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-03-07 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390365#comment-16390365
 ] 

Thomas Weise commented on FLINK-5697:
-

For idleness detection see: [https://github.com/apache/flink/pull/5634]

 

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-02-27 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379815#comment-16379815
 ] 

Thomas Weise commented on FLINK-5697:
-

We have implemented periodic watermark support in a customization of 
FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance 
of AssignerWithPeriodicWatermarks and uses the configuration from 
ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also 
addresses the issue described in FLINK-5479 with an (optional) interval 
property for the user to specify after how much time since the last a shard is 
considered idle and should not hold back the watermark. If there is interest, I 
would contribute these changes to the current Flink Kinesis connector. 

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-02-08 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357640#comment-16357640
 ] 

Thomas Weise commented on FLINK-5697:
-

Since shards are immutable wrt their hash key range and records cannot move 
between shards, we should be able to use the parent shard IDs and the last read 
sequence to find when a newly discovered shard can be read from. Child shards 
don't need to be assigned to the same subtask, in which case we would need a 
way to know the last read offset from the parent shard from a different subtask 
for comparison with EndingSequenceNumber. Is it possible to retrieve the last 
checkpointed offsets from other subtasks outside of restore to perform such 
check? (It would still imply that consumption from a new child shard cannot 
start until the parent was checkpointed and therefore add latency, but would 
provide the ordering guarantee we are looking for?)

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2017-04-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970738#comment-15970738
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5697:


I'd say it still makes sense. If I understand how Kinesis resharding works 
correctly, merging or splitting shards always result in new empty shards with 
new key ranges, and the previous shards that were merged or split are simple 
closed (i.e., records that were collected by Kinesis before the resharding are 
consumed from the old closed shards). Therefore, we can still assume simple 
event time patterns per-shard, as there is no merging or splitting of the 
Kinesis records taking place.

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2017-04-15 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15970149#comment-15970149
 ] 

Eron Wright  commented on FLINK-5697:
-

My understanding is that per-partition watermarks make sense due to ordering 
guarantees and the assumption of strictly-ascending timestamps per partition in 
some Kafka app architectures.   Given how shards are dynamic in Kinesis, does 
this functionality make sense here?

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)