[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369031#comment-15369031
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2214


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369001#comment-15369001
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70164370
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Oh I see, right :) Okay! Thanks for fixing it for me!


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368993#comment-15368993
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70164121
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Ah, sorry, its not about the method name. 
I meant `kinesis = kinesis.assignTimestampsAndWatermarks()`.
The problem is that the `assignTimestampsAndWatermarks()` returns a stream 
with assigned timestamps.
So doing
```java
kinesis.assignTimestampsAndWatermarks();
kinesis.timeWindow(); // <-- this time window won't get the watermarks
```
won't work.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368921#comment-15368921
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70161839
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Thanks Robert.
I'm not quite sure about the problem with using 
`assignTimestampsAndWatermarks()` here, can you explain? I looked at the code, 
and from my understanding there's not much difference  with 
`assignTimestamps()` except that `assignTimestamps()` is deprecated.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367948#comment-15367948
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70103527
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

There is one minor thing here, you have to do kinesis = kinesis.assignTS() 
in order to work properly.
But I'll fix it while merging.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367942#comment-15367942
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2214
  
+1 to merge once travis is green.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367586#comment-15367586
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2214
  
@rmetzger Thank you for the review. Comments addressed :)


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367583#comment-15367583
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70063862
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used. This 
timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
--- End diff --

Good point. Added!


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367544#comment-15367544
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70059358
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
 * This method is called by {@link ShardConsumer}s.
 *
 * @param record the record to collect
+* @param recordTimestamp timestamp to attach to the collected record
 * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
 *this index should be the returned value from
 *{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
 *when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
-   protected void emitRecordAndUpdateState(T record, int shardStateIndex, 
SequenceNumber lastSequenceNumber) {
+   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collect(record);
--- End diff --

Ah, sorry, this is actually wrong. Should be using 
`sourceContext.collectWithTimestamp()`, missed this.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367542#comment-15367542
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70059282
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
 * This method is called by {@link ShardConsumer}s.
 *
 * @param record the record to collect
+* @param recordTimestamp timestamp to attach to the collected record
 * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
 *this index should be the returned value from
 *{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
 *when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
-   protected void emitRecordAndUpdateState(T record, int shardStateIndex, 
SequenceNumber lastSequenceNumber) {
+   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collect(record);
--- End diff --

Did you also consider passing the record to the 
`sourceContext.collectWithTimestamp()` method in addition to passing it to the 
serialization schema?


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367536#comment-15367536
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70058907
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used. This 
timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
--- End diff --

Maybe we can also add a sentence saying that users can override the 
timestamp if they want to extract their own event-time timestamp.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367533#comment-15367533
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2214
  
Thx for rebasing.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367512#comment-15367512
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2214
  
Rebasing ...


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367069#comment-15367069
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2214
  
@rmetzger can you also help review this PR for the Kinesis connector too? 
Thanks!


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367031#comment-15367031
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2214

[FLINK-4019][kinesis-connector] Use Kinesis records' 
approximateArrivalTimestamp

This Kinesis-provided timestamp is used in the following:
1) Exposed through the KinesisDeserializationSchema for users to obtain
2) Attached to records as the default event time timestamp

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4019

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2214


commit c71666f53dd666d11f85b0303b586cbb38175963
Author: Gordon Tai 
Date:   2016-07-08T00:46:31Z

[FLINK-4019][kinesis-connector] Use Kinesis records' 
approximateArrivalTimestamp

Used in the following:
1) Exposed through the KinesisDeserializationSchema for users to obtain
2) Attatched to records as the default event time




> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333858#comment-15333858
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4019:


Ok. I'll add this then along with the upcoming PR for reshard handling.

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316225#comment-15316225
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4019:


Good suggestion! For now, the connector doesn't support users to give timestamp 
/ watermark assigners. Perhaps we should expand the scope of this JIRA to 
include this also?

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-05 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316048#comment-15316048
 ] 

Stephan Ewen commented on FLINK-4019:
-

That would also be great to attach as a default time stamp. So, if you pick 
Kinesis and Event Time and do not manually assign timestamps and watermarks, 
you actually get "Kinesis Store Time".

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)