[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369031#comment-15369031 ] ASF GitHub Bot commented on FLINK-4019: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2214 > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369001#comment-15369001 ] ASF GitHub Bot commented on FLINK-4019: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70164370 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- Oh I see, right :) Okay! Thanks for fixing it for me! > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368993#comment-15368993 ] ASF GitHub Bot commented on FLINK-4019: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70164121 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- Ah, sorry, its not about the method name. I meant `kinesis = kinesis.assignTimestampsAndWatermarks()`. The problem is that the `assignTimestampsAndWatermarks()` returns a stream with assigned timestamps. So doing ```java kinesis.assignTimestampsAndWatermarks(); kinesis.timeWindow(); // <-- this time window won't get the watermarks ``` won't work. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368921#comment-15368921 ] ASF GitHub Bot commented on FLINK-4019: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70161839 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- Thanks Robert. I'm not quite sure about the problem with using `assignTimestampsAndWatermarks()` here, can you explain? I looked at the code, and from my understanding there's not much difference with `assignTimestamps()` except that `assignTimestamps()` is deprecated. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367948#comment-15367948 ] ASF GitHub Bot commented on FLINK-4019: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70103527 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- There is one minor thing here, you have to do kinesis = kinesis.assignTS() in order to work properly. But I'll fix it while merging. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367942#comment-15367942 ] ASF GitHub Bot commented on FLINK-4019: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2214 +1 to merge once travis is green. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367586#comment-15367586 ] ASF GitHub Bot commented on FLINK-4019: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2214 @rmetzger Thank you for the review. Comments addressed :) > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367583#comment-15367583 ] ASF GitHub Bot commented on FLINK-4019: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70063862 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). --- End diff -- Good point. Added! > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367544#comment-15367544 ] ASF GitHub Bot commented on FLINK-4019: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70059358 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() { * This method is called by {@link ShardConsumer}s. * * @param record the record to collect +* @param recordTimestamp timestamp to attach to the collected record * @param shardStateIndex index of the shard to update in subscribedShardsState; *this index should be the returned value from *{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called *when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collect(record); --- End diff -- Ah, sorry, this is actually wrong. Should be using `sourceContext.collectWithTimestamp()`, missed this. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367542#comment-15367542 ] ASF GitHub Bot commented on FLINK-4019: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70059282 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() { * This method is called by {@link ShardConsumer}s. * * @param record the record to collect +* @param recordTimestamp timestamp to attach to the collected record * @param shardStateIndex index of the shard to update in subscribedShardsState; *this index should be the returned value from *{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called *when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collect(record); --- End diff -- Did you also consider passing the record to the `sourceContext.collectWithTimestamp()` method in addition to passing it to the serialization schema? > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367536#comment-15367536 ] ASF GitHub Bot commented on FLINK-4019: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70058907 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). --- End diff -- Maybe we can also add a sentence saying that users can override the timestamp if they want to extract their own event-time timestamp. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367533#comment-15367533 ] ASF GitHub Bot commented on FLINK-4019: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2214 Thx for rebasing. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367512#comment-15367512 ] ASF GitHub Bot commented on FLINK-4019: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2214 Rebasing ... > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367069#comment-15367069 ] ASF GitHub Bot commented on FLINK-4019: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2214 @rmetzger can you also help review this PR for the Kinesis connector too? Thanks! > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367031#comment-15367031 ] ASF GitHub Bot commented on FLINK-4019: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2214 [FLINK-4019][kinesis-connector] Use Kinesis records' approximateArrivalTimestamp This Kinesis-provided timestamp is used in the following: 1) Exposed through the KinesisDeserializationSchema for users to obtain 2) Attached to records as the default event time timestamp You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4019 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2214 commit c71666f53dd666d11f85b0303b586cbb38175963 Author: Gordon TaiDate: 2016-07-08T00:46:31Z [FLINK-4019][kinesis-connector] Use Kinesis records' approximateArrivalTimestamp Used in the following: 1) Exposed through the KinesisDeserializationSchema for users to obtain 2) Attatched to records as the default event time > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333858#comment-15333858 ] Tzu-Li (Gordon) Tai commented on FLINK-4019: Ok. I'll add this then along with the upcoming PR for reshard handling. > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316225#comment-15316225 ] Tzu-Li (Gordon) Tai commented on FLINK-4019: Good suggestion! For now, the connector doesn't support users to give timestamp / watermark assigners. Perhaps we should expand the scope of this JIRA to include this also? > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
[ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316048#comment-15316048 ] Stephan Ewen commented on FLINK-4019: - That would also be great to attach as a default time stamp. So, if you pick Kinesis and Event Time and do not manually assign timestamps and watermarks, you actually get "Kinesis Store Time". > Expose approximateArrivalTimestamp through the KinesisDeserializationSchema > interface > - > > Key: FLINK-4019 > URL: https://issues.apache.org/jira/browse/FLINK-4019 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai > > Amazon's Record class also gives information about the timestamp of when > Kinesis successfully receives the record: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp(). > This should be useful info for users and should be exposed through the > deserialization schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332)