[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946622#comment-15946622 ] Wei-Che Wei commented on FLINK-4523: [~tsriharsha] I have opened a task (FLINK-6211) to solve it. Thank you again for finding this problem. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.3.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946592#comment-15946592 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r108599820 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } } + + private static void validateOptionalDateProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + initTimestampDateFormat.parse(config.getProperty(key)); --- End diff -- @tsriharsha Thanks for pointing out this problem. I will fix it soon. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.3.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946588#comment-15946588 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tsriharsha commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r108598516 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } } + + private static void validateOptionalDateProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + initTimestampDateFormat.parse(config.getProperty(key)); --- End diff -- Hey this throws an error if a double is passed but the next line throws an exception when a timestamp string is passed. @tzulitai @tony810430 > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.3.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946584#comment-15946584 ] Sriharsha Tikkireddy commented on FLINK-4523: - [~tonywei] Hey I tried to run using an input of timestamp as start position but the following KinesisUtil function being called always throws an error because if input epoch time the date formatter parsing will throw a parse exception and if you enter in string characters you get an exception for parsing string values: private static void validateOptionalDateProperty(Properties config, String key, String message) { if (config.containsKey(key)) { try { initTimestampDateFormat.parse(config.getProperty(key)); --- double value = Double.parseDouble(config.getProperty(key)); --- if (value < 0) { throw new NumberFormatException(); } } catch (ParseException | NumberFormatException e) { throw new IllegalArgumentException(message); } } } > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.3.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15811373#comment-15811373 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Hi @tony810430, I've finished adding documents for this new feature, and opened a pull request at your local `FLINK-4523` branch. Can you take a look (would be great if you can review it too!), and merge the pull request to your `FLINK-4523`, so that it gets included in this PR too? Thanks! > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750659#comment-15750659 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Thanks for addressing the final comments. I'll add the docs and merge this by the end of the day :) > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714743#comment-15714743 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/2916 I'm not familiar how to update document. I will be grateful if you can take over this work. Thanks > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714567#comment-15714567 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90607682 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe * {@inheritDoc} */ @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType); + + switch (ShardIteratorType.fromValue(shardIteratorType)) { + case TRIM_HORIZON: + case LATEST: + break; + case AT_TIMESTAMP: + getShardIteratorRequest.setTimestamp((Date) startingMarker); + break; + case AT_SEQUENCE_NUMBER: + case AFTER_SEQUENCE_NUMBER: + getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker); --- End diff -- Same here, see above: consider handling type case exceptions? > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714566#comment-15714566 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90607651 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe * {@inheritDoc} */ @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType); + + switch (ShardIteratorType.fromValue(shardIteratorType)) { + case TRIM_HORIZON: + case LATEST: + break; + case AT_TIMESTAMP: + getShardIteratorRequest.setTimestamp((Date) startingMarker); --- End diff -- For the new implementation of this method, we probably should handle type casting exceptions, and wrap them as IllegalArgumentExceptions. Since we're doing the property config validating locally, this really shouldn't happen, but it would be good to add handling type cast errors here to make this class self-contained. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714565#comment-15714565 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90608161 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java --- @@ -34,14 +34,16 @@ * * @param shard the shard to get the iterator * @param shardIteratorType the iterator type, defining how the shard is to be iterated -* (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) -* @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST +* (one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) +* @param startingMarker is null if shardIteratorType is TRIM_HORIZON or LATEST, +* is as a timestamp if shardIteratorType is AT_TIMESTAMP, +* is as a sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER --- End diff -- I would suggest the following changes to the Javadoc here to be more specific: is null --> "should be {@code null} if ..." is as a timestamp --> "should be a {@code Date} value if ..." is as a sequence number --> "should be a {@code String} representing the sequence number if ..." > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714544#comment-15714544 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90607632 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe * {@inheritDoc} */ @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType); + + switch (ShardIteratorType.fromValue(shardIteratorType)) { + case TRIM_HORIZON: + case LATEST: + break; + case AT_TIMESTAMP: + getShardIteratorRequest.setTimestamp((Date) startingMarker); --- End diff -- For the new implementation of this method, we probably should handle type casting exceptions, and wrap them as `IllegalArgumentException`s. Since we're doing the property config validating locally, this really shouldn't happen, but it would be good to add handling type cast errors here to make this class self-contained. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714531#comment-15714531 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90606695 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { --- End diff -- Ah I see, sorry for missing that. The fix for this seems good. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711604#comment-15711604 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420855 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { --- End diff -- Because consumerConfig can't be accessed in 'run()' > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711605#comment-15711605 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420861 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.init.timestamp"; --- End diff -- ok > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711598#comment-15711598 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420737 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -122,12 +138,14 @@ public void run() { if (subscribedShard.isClosed()) { nextShardItr = null; } else { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), (String) null); --- End diff -- I will solve this problem with the problem below. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711592#comment-15711592 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420632 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSeqNum); + return getShardIterator(getShardIteratorRequest); + } + + /** +* {@inheritDoc} +*/ + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nonnull final Date startingTimestamp) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withTimestamp(startingTimestamp); --- End diff -- I will check the shard iterator type in the new method by merging these two 'getShardIterator' methods. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711587#comment-15711587 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420204 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -145,6 +145,39 @@ public void testUnrecognizableStreamInitPositionTypeInConfig() { } @Test + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " --- End diff -- ok > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711498#comment-15711498 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410520 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map streamNamesWithLastSe */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSeqNum); + return getShardIterator(getShardIteratorRequest); + } + + /** +* {@inheritDoc} +*/ + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nonnull final Date startingTimestamp) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withTimestamp(startingTimestamp); --- End diff -- How does the Kinesis API behave, when you set both a non-matching shard iterator type, and a timestamp? In other words, what happens when the shard iterator type is perhaps `TRIM_HORIZON`, but a timestamp is also provided? If the API call will fail with Kinesis doesn't allow such combinations, we probably should make this method implementation fail-proof of such situations. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711497#comment-15711497 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410957 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -57,6 +60,17 @@ public static void validateConsumerConfiguration(Properties config) { } throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString()); } + + // specified initial timestamp in stream when using AT_TIMESTAMP + if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) { + if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) { + throw new IllegalArgumentException("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + } + validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " + + "Must be a valid format: -MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); --- End diff -- That's really good error messaging for the user :) Like I mentioned above, we need to inform this in the document as well. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711499#comment-15711499 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410148 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -122,12 +138,14 @@ public void run() { if (subscribedShard.isClosed()) { nextShardItr = null; } else { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), (String) null); --- End diff -- Is the String type casting necessary? > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711501#comment-15711501 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410106 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { --- End diff -- I think it might be cleaner if we extract / parse the `initTimestamp` in the sentinel sequence number case determination at the beginning of run(). As for checking if the timestamp is parseable, we should do that when validating the user-provided property configs locally at the job client. You can take a look at the constructor code of `FlinkKinesisConsumer` - that's where we validate the properties. This validation is done at the job client, so that if in any case the properties is errorneous / unparseable, we can handle that _before_ a Flink job is actually launched. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711504#comment-15711504 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90408543 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.init.timestamp"; --- End diff -- What do you think about renaming this to `flink.stream.initpos.timestamp`, instead of `flink.stream.init.timestamp`? I personally think it's a good idea to do so, because essentially `STREAM_INITIAL_TIMESTAMP` is a sub-setting of `STREAM_INITIAL_POSITION`. In other words, `STREAM_INITIAL_TIMESTAMP` will be meaningless is a `STREAM_INITIAL_POSITION = AT_TIMESTAMP` isn't set in the config. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711503#comment-15711503 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90411420 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -145,6 +145,39 @@ public void testUnrecognizableStreamInitPositionTypeInConfig() { } @Test + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " --- End diff -- I would suggest to not let the expect message be this verbose in tests. This increases the likeliness that the tests will need to be altered, whenever we want to tweak the messages a bit. I think `Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream` is enough. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711502#comment-15711502 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90411159 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } } + + private static void validateOptionalDateProperty(Properties config, String key, String message) { --- End diff -- Ah, you've already checked the parsing of Date format here. I'd say we don't really need to do the `ParseException` handling in `ShardConsumer` then, because that should never fail. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711500#comment-15711500 ] ASF GitHub Bot commented on FLINK-4523: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410691 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -35,6 +37,7 @@ * Utilities for Flink Kinesis connector configuration. */ public class KinesisConfigUtil { + public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX"); --- End diff -- We probably need to update the Kinesis document as well, to inform the users of this format. > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date
[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711248#comment-15711248 ] ASF GitHub Bot commented on FLINK-4523: --- GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/2916 [FLINK-4523] [kinesis] Allow Kinesis Consumer to start from specific timestamp / Date You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-4523 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2916 commit e2d2cf2e604e329523c53ebfb1b4c4597687211b Author: 魏偉哲 Date: 2016-12-01T03:40:46Z [FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date > Allow Kinesis Consumer to start from specific timestamp / Date > -- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)