[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2017-03-28 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946622#comment-15946622
 ] 

Wei-Che Wei commented on FLINK-4523:


[~tsriharsha]

I have opened a task (FLINK-6211) to solve it. Thank you again for finding this 
problem.

> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.3.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946592#comment-15946592
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r108599820
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -215,4 +229,18 @@ private static void 
validateOptionalPositiveDoubleProperty(Properties config, St
}
}
}
+
+   private static void validateOptionalDateProperty(Properties config, 
String key, String message) {
+   if (config.containsKey(key)) {
+   try {
+   
initTimestampDateFormat.parse(config.getProperty(key));
--- End diff --

@tsriharsha Thanks for pointing out this problem. I will fix it soon.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.3.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946588#comment-15946588
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tsriharsha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r108598516
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -215,4 +229,18 @@ private static void 
validateOptionalPositiveDoubleProperty(Properties config, St
}
}
}
+
+   private static void validateOptionalDateProperty(Properties config, 
String key, String message) {
+   if (config.containsKey(key)) {
+   try {
+   
initTimestampDateFormat.parse(config.getProperty(key));
--- End diff --

Hey this throws an error if a double is passed but the next line throws an 
exception when a timestamp string is passed. @tzulitai  @tony810430 


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.3.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2017-03-28 Thread Sriharsha Tikkireddy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946584#comment-15946584
 ] 

Sriharsha Tikkireddy commented on FLINK-4523:
-

[~tonywei]
Hey I tried to run using an input of timestamp as start position but the 
following KinesisUtil function being called always throws an error because if 
input epoch time the date formatter parsing will throw a parse exception and if 
you enter in string characters you get an exception for parsing string values: 

private static void validateOptionalDateProperty(Properties config, String key, 
String message) {
if (config.containsKey(key)) {
try {

initTimestampDateFormat.parse(config.getProperty(key)); ---
double value = 
Double.parseDouble(config.getProperty(key)); ---
if (value < 0) {
throw new NumberFormatException();
}
} catch (ParseException | NumberFormatException e) {
throw new IllegalArgumentException(message);
}
}
}

> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.3.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2017-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15811373#comment-15811373
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2916
  
Hi @tony810430, I've finished adding documents for this new feature, and 
opened a pull request at your local `FLINK-4523` branch.

Can you take a look (would be great if you can review it too!), and merge 
the pull request to your `FLINK-4523`, so that it gets included in this PR too? 
Thanks!


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750659#comment-15750659
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2916
  
Thanks for addressing the final comments. I'll add the docs and merge this 
by the end of the day :)


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714743#comment-15714743
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/2916
  
I'm not familiar how to update document. I will be grateful if you can take 
over this work. Thanks


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714567#comment-15714567
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90607682
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe
 * {@inheritDoc}
 */
@Override
-   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
+   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable Object startingMarker) throws InterruptedException 
{
+   GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+   .withStreamName(shard.getStreamName())
+   .withShardId(shard.getShard().getShardId())
+   .withShardIteratorType(shardIteratorType);
+
+   switch (ShardIteratorType.fromValue(shardIteratorType)) {
+   case TRIM_HORIZON:
+   case LATEST:
+   break;
+   case AT_TIMESTAMP:
+   getShardIteratorRequest.setTimestamp((Date) 
startingMarker);
+   break;
+   case AT_SEQUENCE_NUMBER:
+   case AFTER_SEQUENCE_NUMBER:
+   
getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
--- End diff --

Same here, see above: consider handling type case exceptions?


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714566#comment-15714566
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90607651
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe
 * {@inheritDoc}
 */
@Override
-   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
+   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable Object startingMarker) throws InterruptedException 
{
+   GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+   .withStreamName(shard.getStreamName())
+   .withShardId(shard.getShard().getShardId())
+   .withShardIteratorType(shardIteratorType);
+
+   switch (ShardIteratorType.fromValue(shardIteratorType)) {
+   case TRIM_HORIZON:
+   case LATEST:
+   break;
+   case AT_TIMESTAMP:
+   getShardIteratorRequest.setTimestamp((Date) 
startingMarker);
--- End diff --

For the new implementation of this method, we probably should handle type 
casting exceptions, and wrap them as IllegalArgumentExceptions. Since we're 
doing the property config validating locally, this really shouldn't happen, but 
it would be good to add handling type cast errors here to make this class 
self-contained.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714565#comment-15714565
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90608161
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
 ---
@@ -34,14 +34,16 @@
 *
 * @param shard the shard to get the iterator
 * @param shardIteratorType the iterator type, defining how the shard 
is to be iterated
-*  (one of: TRIM_HORIZON, LATEST, 
AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
-* @param startingSeqNum sequence number, must be null if 
shardIteratorType is TRIM_HORIZON or LATEST
+*  (one of: TRIM_HORIZON, LATEST, 
AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+* @param startingMarker is null if shardIteratorType is TRIM_HORIZON 
or LATEST,
+*   is as a timestamp if shardIteratorType is 
AT_TIMESTAMP,
+*   is as a sequence number if shardIteratorType 
is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
--- End diff --

I would suggest the following changes to the Javadoc here to be more 
specific:

is null --> "should be {@code null} if ..."
is as a timestamp --> "should be a {@code Date} value if ..."
is as a sequence number --> "should be a {@code String} representing the 
sequence number if ..."



> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714544#comment-15714544
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90607632
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe
 * {@inheritDoc}
 */
@Override
-   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
+   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable Object startingMarker) throws InterruptedException 
{
+   GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+   .withStreamName(shard.getStreamName())
+   .withShardId(shard.getShard().getShardId())
+   .withShardIteratorType(shardIteratorType);
+
+   switch (ShardIteratorType.fromValue(shardIteratorType)) {
+   case TRIM_HORIZON:
+   case LATEST:
+   break;
+   case AT_TIMESTAMP:
+   getShardIteratorRequest.setTimestamp((Date) 
startingMarker);
--- End diff --

For the new implementation of this method, we probably should handle type 
casting exceptions, and wrap them as `IllegalArgumentException`s. Since we're 
doing the property config validating locally, this really shouldn't happen, but 
it would be good to add handling type cast errors here to make this class 
self-contained.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714531#comment-15714531
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90606695
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(

ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,

Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+   if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
--- End diff --

Ah I see, sorry for missing that. The fix for this seems good.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711604#comment-15711604
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90420855
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(

ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,

Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+   if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
--- End diff --

Because consumerConfig can't be accessed in 'run()'


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711605#comment-15711605
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90420861
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
 
+   /** The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
+   public static final String STREAM_INITIAL_TIMESTAMP = 
"flink.stream.init.timestamp";
--- End diff --

ok


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711598#comment-15711598
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90420737
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -122,12 +138,14 @@ public void run() {
if (subscribedShard.isClosed()) {
nextShardItr = null;
} else {
-   nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
+   nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
(String) null);
--- End diff --

I will solve this problem with the problem below.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711592#comment-15711592
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90420632
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe
 */
@Override
public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
+   GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+   .withStreamName(shard.getStreamName())
+   .withShardId(shard.getShard().getShardId())
+   .withShardIteratorType(shardIteratorType)
+   .withStartingSequenceNumber(startingSeqNum);
+   return getShardIterator(getShardIteratorRequest);
+   }
+
+   /**
+* {@inheritDoc}
+*/
+   @Override
+   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nonnull final Date startingTimestamp) throws 
InterruptedException {
+   GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+   .withStreamName(shard.getStreamName())
+   .withShardId(shard.getShard().getShardId())
+   .withShardIteratorType(shardIteratorType)
+   .withTimestamp(startingTimestamp);
--- End diff --

I will check the shard iterator type in the new method by merging these two 
'getShardIterator' methods.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711587#comment-15711587
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tony810430 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90420204
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -145,6 +145,39 @@ public void 
testUnrecognizableStreamInitPositionTypeInConfig() {
}
 
@Test
+   public void 
testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+   exception.expect(IllegalArgumentException.class);
+   exception.expectMessage("Please set value for initial timestamp 
('"
+   + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + 
"') when using AT_TIMESTAMP initial position.");
+
+   Properties testConfig = new Properties();
+   testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+   
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+   
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+   
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+   
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+
+   KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+   }
+
+   @Test
+   public void testUnparsableDateForInitialTimestampInConfig() {
+   exception.expect(IllegalArgumentException.class);
+   exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream. "
--- End diff --

ok


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711498#comment-15711498
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90410520
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe
 */
@Override
public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
+   GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+   .withStreamName(shard.getStreamName())
+   .withShardId(shard.getShard().getShardId())
+   .withShardIteratorType(shardIteratorType)
+   .withStartingSequenceNumber(startingSeqNum);
+   return getShardIterator(getShardIteratorRequest);
+   }
+
+   /**
+* {@inheritDoc}
+*/
+   @Override
+   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nonnull final Date startingTimestamp) throws 
InterruptedException {
+   GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+   .withStreamName(shard.getStreamName())
+   .withShardId(shard.getShard().getShardId())
+   .withShardIteratorType(shardIteratorType)
+   .withTimestamp(startingTimestamp);
--- End diff --

How does the Kinesis API behave, when you set both a non-matching shard 
iterator type, and a timestamp?
In other words, what happens when the shard iterator type is perhaps 
`TRIM_HORIZON`, but a timestamp is also provided?

If the API call will fail with Kinesis doesn't allow such combinations, we 
probably should make this method implementation fail-proof of such situations.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711497#comment-15711497
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90410957
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -57,6 +60,17 @@ public static void 
validateConsumerConfiguration(Properties config) {
}
throw new IllegalArgumentException("Invalid 
initial position in stream set in config. Valid values are: " + sb.toString());
}
+
+   // specified initial timestamp in stream when using 
AT_TIMESTAMP
+   if (InitialPosition.valueOf(initPosType) == 
InitialPosition.AT_TIMESTAMP) {
+   if 
(!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
+   throw new 
IllegalArgumentException("Please set value for initial timestamp ('"
+   + 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP 
initial position.");
+   }
+   validateOptionalDateProperty(config, 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+   "Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream. "
+   + "Must be a valid format: 
-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 
2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
--- End diff --

That's really good error messaging for the user :) Like I mentioned above, 
we need to inform this in the document as well.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711499#comment-15711499
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90410148
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -122,12 +138,14 @@ public void run() {
if (subscribedShard.isClosed()) {
nextShardItr = null;
} else {
-   nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
+   nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
(String) null);
--- End diff --

Is the String type casting necessary?


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711501#comment-15711501
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90410106
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(

ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,

Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+   if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
--- End diff --

I think it might be cleaner if we extract / parse the `initTimestamp` in 
the sentinel sequence number case determination at the beginning of run().

As for checking if the timestamp is parseable, we should do that when 
validating the user-provided property configs locally at the job client. You 
can take a look at the constructor code of `FlinkKinesisConsumer` - that's 
where we validate the properties. This validation is done at the job client, so 
that if in any case the properties is errorneous / unparseable, we can handle 
that _before_ a Flink job is actually launched.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711504#comment-15711504
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90408543
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
 
+   /** The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
+   public static final String STREAM_INITIAL_TIMESTAMP = 
"flink.stream.init.timestamp";
--- End diff --

What do you think about renaming this to `flink.stream.initpos.timestamp`, 
instead of `flink.stream.init.timestamp`?
I personally think it's a good idea to do so, because essentially 
`STREAM_INITIAL_TIMESTAMP` is a sub-setting of `STREAM_INITIAL_POSITION`. In 
other words, `STREAM_INITIAL_TIMESTAMP` will be meaningless is a 
`STREAM_INITIAL_POSITION = AT_TIMESTAMP` isn't set in the config.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711503#comment-15711503
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90411420
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -145,6 +145,39 @@ public void 
testUnrecognizableStreamInitPositionTypeInConfig() {
}
 
@Test
+   public void 
testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+   exception.expect(IllegalArgumentException.class);
+   exception.expectMessage("Please set value for initial timestamp 
('"
+   + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + 
"') when using AT_TIMESTAMP initial position.");
+
+   Properties testConfig = new Properties();
+   testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+   
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+   
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+   
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+   
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+
+   KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+   }
+
+   @Test
+   public void testUnparsableDateForInitialTimestampInConfig() {
+   exception.expect(IllegalArgumentException.class);
+   exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream. "
--- End diff --

I would suggest to not let the expect message be this verbose in tests.
This increases the likeliness that the tests will need to be altered, 
whenever we want to tweak the messages a bit.
I think `Invalid value given for initial timestamp for AT_TIMESTAMP initial 
position in stream` is enough.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711502#comment-15711502
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90411159
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -215,4 +229,18 @@ private static void 
validateOptionalPositiveDoubleProperty(Properties config, St
}
}
}
+
+   private static void validateOptionalDateProperty(Properties config, 
String key, String message) {
--- End diff --

Ah, you've already checked the parsing of Date format here.
I'd say we don't really need to do the `ParseException` handling in 
`ShardConsumer` then, because that should never fail.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711500#comment-15711500
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2916#discussion_r90410691
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -35,6 +37,7 @@
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
+   public static SimpleDateFormat initTimestampDateFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX");
--- End diff --

We probably need to update the Kinesis document as well, to inform the 
users of this format.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711248#comment-15711248
 ] 

ASF GitHub Bot commented on FLINK-4523:
---

GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/2916

[FLINK-4523] [kinesis] Allow Kinesis Consumer to start from specific 
timestamp / Date



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-4523

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2916.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2916


commit e2d2cf2e604e329523c53ebfb1b4c4597687211b
Author: 魏偉哲 
Date:   2016-12-01T03:40:46Z

[FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date




> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)