[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305762#comment-15305762
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3229:


Great working with the community :) Thanks for merging!

> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288916#comment-15288916
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1911


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272584#comment-15272584
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-217202359
  
Yep, that's the right branch.
I tried working on different approaches, but its just an annoying problem 
with protobuf.
I'll probably work on it tomorrow again.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272578#comment-15272578
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-217201356
  
Thanks Robert. I'll keep notice of your FLINK-3229-review branch for the 
changes (I'm assuming your working on FLINK-3229-review for the protobuf 
problem, please tell me if I'm wrong :))

Also, if there is anything I can do / help with (etc. tests on other 
environments) to further improve the PR, please don't hesitate to let me know :)


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268620#comment-15268620
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-216512837
  
I'm currently working on a custom branch based on this pull request.
It seems that we are running into some dependency issues when using the 
kinesis-connector in AWS EMR.

It seems that there is a clash with the protobuf versions (kinesis needs 
2.6.x, but Flink has 2.5.0 in its classpath).

I keep you posted


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262706#comment-15262706
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r61480982
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262033#comment-15262033
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-215402825
  
I'm currently busy with some other ongoing tasks. I hope to get back to 
this PR soon.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254633#comment-15254633
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213581650
  
Regarding the Jackson / dependency issue: You don't need to worry about it 
know. I'll take another look at the problem and make sure it'll work once we 
merge it.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254608#comment-15254608
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60799066
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254583#comment-15254583
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60797751
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254332#comment-15254332
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213530765
  
Hi,

Cool, if you have time to adress them, go ahead :)
Thanks a lot for doing this by the way! I really like the work you did so 
far on the connector!

Sent from my iPhone

> On 22.04.2016, at 18:01, Tzu-Li Tai  wrote:
> 
> @rmetzger 
> Thank you very much for your detailed review on the PR :)
> I've replied to the comments you added, please .
> I can address the issues and follow up with corresponding commits within 
the next 36 hours. I am pretty much free for the next 3 days, and will very 
much like to get the consumer ready for merging by the end of this week :)
> 
> If it still isn't ready by the end of 4/25, I'm afraid I will have to 
leave any remaining issues for you to address since after then I temporarily 
won't be able to work on code until June.
> 
> —
> You are receiving this because you were mentioned.
> Reply to this email directly or view it on GitHub
> 



> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254221#comment-15254221
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60770825
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ *
+ */
+public class KinesisConfigConstants {
+
+   // 

+   //  Configuration Keys
+   // 

+
+   /** The max retries to retrieve metadata from a Kinesis stream using 
describeStream API
+* (Note: describeStream attempts may be temporarily blocked due to AWS 
capping 5 attempts per sec)  */
+   public static final String CONFIG_STREAM_DESCRIBE_RETRIES = 
"flink.stream.describe.retry";
+
+   /** The backoff time between each describeStream attempt */
+   public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = 
"flink.stream.describe.backoff";
+
+   /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
+   public static final String CONFIG_STREAM_INIT_POSITION_TYPE = 
"flink.stream.initpos.type";
+
+   /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set)*/
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = 
"aws.credentials.provider";
+
+   /** The AWS access key ID to use when setting credentials provider type 
to BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = 
"aws.credentials.provider.basic.accesskeyid";
+
+   /** The AWS secret key to use when setting credentials provider type to 
BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = 
"aws.credentials.provider.basic.secretkey";
+
+   /** Optional configuration for profile path if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH 
= "aws.credentials.provider.profile.path";
+
+   /** Optional configuration for profile name if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME 
= "aws.credentials.provider.profile.name";
+
+   /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is 
used if not set) */
+   public static final String CONFIG_AWS_REGION = "aws.region";
+
+
+   // 

+   //  Default configuration values
+   // 

+
+   public static final String DEFAULT_AWS_REGION = "us-east-1";
--- End diff --

I think its reasonable to make region a required argument. As a user, more 
than once I've found myself mistakened for the AWS SDK not correctly finding 
resources, only realizing that it is defaulting to another region unless 
specified.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254132#comment-15254132
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213491263
  
@rmetzger 
Thank you very much for your detailed review on the PR :)
I've replied to the comments you added, please .
I can address the issues and follow up with corresponding commits within 
the next 36 hours. I am pretty much free for the next 3 days, and will very 
much like to get the consumer ready for merging by the end of this week :)

If it still isn't ready by the end of 4/25, I'm afraid I will have to leave 
any remaining issues for you to address since after then I temporarily won't be 
able to work on code until June.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254110#comment-15254110
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60760294
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254107#comment-15254107
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60760085
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254106#comment-15254106
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60759925
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard 
is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
--- End diff --

The main reason for why the consumer needs to have another Shard 
representation is because `com.amazonaws.services.kinesis.model.Shard` doesn't 
have the shard's associated stream name as a field. We will need the stream 
name when getting a shard iterator for a particular shard, using 
`com.amazonaws.services.kinesis.AmazonKinesisClient#getShardIterator(streamName,
 shardId, iteratorType)`. Moreover, since the consumer's implementation 
supports reading from multiple Kinesis streams, we must carry the associated 
stream name along with each Shard representation (I guess that's the reason why 
Amazon's Shard implementation doesn't have a field for stream name).

Our implementation, 
`org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard`, 
currently has `regionName` (I don't think we need this actually, since the 
consumer is limited to read from Kinesis streams within the same region) and 
`streamName` as fields besides the already supplied ones in Amazon's Shard. So, 
what we could do to reduce duplicate implementation is to include Amazon's 
Shard implementation as a field within our `KinesisStreamShard`, and let the 
`KinesisStreamShard` still have `streamName` as an extra field. How do you 
think?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254090#comment-15254090
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60756732
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254080#comment-15254080
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60755162
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254068#comment-15254068
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60753879
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
+
+   this.kinesisClient = client;
+   }
+
+   /**
+* Get the next batch of data records using a specific shard iterator
+*
+* @param shardIterator a shard iterator that encodes info about which 
shard to read and where to start reading
+* @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
+* @return the batch of retrieved records
+*/
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
+   getRecordsRequest.setShardIterator(shardIterator);
+   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254063#comment-15254063
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213465308
  
@rmetzger Regarding the need to upgrade Jackson to get the example working:
After updating the code for Flink 1.0.x to prepare for PR, I only tested 
the consumer with manual tests within the Flink project code.
However, when the consumer was first implemented when Flink was 0.10.1, at 
the time I did package the consumer and used it as a separate dependency.
I'm wondering would it have anything to do with changes between older & 
newer Flink version?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254055#comment-15254055
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60752259
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253899#comment-15253899
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213425285
  
Overall, I really like the work you did for the Kinesis Consumer! As you 
can see I've added some comments on the PR.
Please let me know what's you opinion on my comments.

When you do you think you'll have time to address the issues? I would like 
to get the code merged as soon as possible because Amazon is asking for having 
it in Flink soon. If you know already that you won't have time for working on 
this in the upcoming days, I can also address the comments. Just let me know, I 
think we can find a solution.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253871#comment-15253871
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60733759
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253870#comment-15253870
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60733595
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
--- End diff --

I don't think we need to implement the `CheckpointListener` currently 
because we are not committing the offsets anywhere.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253868#comment-15253868
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60733038
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253857#comment-15253857
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60732384
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard 
is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
--- End diff --

+ with my suggestion to move the shard list out of the constructor, we 
don't rely on serializability anymore.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253856#comment-15253856
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60732321
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard 
is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
--- End diff --

The SDK's `com.amazonaws.services.kinesis.model.Shard` is also 
`Serializable`. I wonder if we really need to create a copy of their Shard 
within Flink.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253852#comment-15253852
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60732133
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253843#comment-15253843
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-213407614
  
I could not get the example to work with the current jackson version. Only 
after upgrading it  to `2.7.3` it was working.
Did you test the kinesis consumer using a separate project (adding the 
flink-kinesis-consumer as a dependency) ?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253838#comment-15253838
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60731184
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253839#comment-15253839
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60731207
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ *
+ */
+public class KinesisConfigConstants {
+
+   // 

+   //  Configuration Keys
+   // 

+
+   /** The max retries to retrieve metadata from a Kinesis stream using 
describeStream API
+* (Note: describeStream attempts may be temporarily blocked due to AWS 
capping 5 attempts per sec)  */
+   public static final String CONFIG_STREAM_DESCRIBE_RETRIES = 
"flink.stream.describe.retry";
+
+   /** The backoff time between each describeStream attempt */
+   public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = 
"flink.stream.describe.backoff";
+
+   /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
+   public static final String CONFIG_STREAM_INIT_POSITION_TYPE = 
"flink.stream.initpos.type";
+
+   /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set)*/
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = 
"aws.credentials.provider";
+
+   /** The AWS access key ID to use when setting credentials provider type 
to BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = 
"aws.credentials.provider.basic.accesskeyid";
+
+   /** The AWS secret key to use when setting credentials provider type to 
BASIC */
+   public static final String 
CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = 
"aws.credentials.provider.basic.secretkey";
+
+   /** Optional configuration for profile path if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH 
= "aws.credentials.provider.profile.path";
+
+   /** Optional configuration for profile name if credential provider type 
is set to be PROFILE */
+   public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME 
= "aws.credentials.provider.profile.name";
+
+   /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is 
used if not set) */
+   public static final String CONFIG_AWS_REGION = "aws.region";
+
+
+   // 

+   //  Default configuration values
+   // 

+
+   public static final String DEFAULT_AWS_REGION = "us-east-1";
--- End diff --

If we make the region a required argument, we won't need this anymore.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253836#comment-15253836
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60731025
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
+
+   this.kinesisClient = client;
+   }
+
+   /**
+* Get the next batch of data records using a specific shard iterator
+*
+* @param shardIterator a shard iterator that encodes info about which 
shard to read and where to start reading
+* @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
+* @return the batch of retrieved records
+*/
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
+   getRecordsRequest.setShardIterator(shardIterator);
+   

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253835#comment-15253835
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60730891
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -0,0 +1,51 @@
+package org.apache.flink.streaming.connectors.kinesis.config;
--- End diff --

The maven build is failing because this file is missing a license header


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253831#comment-15253831
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60730773
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
+ * The fetcher spawns a single thread for connection to each shard.
+ */
+public class KinesisDataFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+   /** Config properties for the Flink Kinesis Consumer */
+   private final Properties configProps;
+
+   /** The AWS credentials provider as specified in config properties */
+   private final AWSCredentialsProvider credentials;
+
+   /** The name of the consumer task that this fetcher was instantiated */
+   private final String taskName;
+
+   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
+   private HashMap 
assignedShardsWithStartingSequenceNum;
+
+   /** Reference to the thread that executed run() */
+   private volatile Thread mainThread;
+
+   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
+   private final AtomicReference error;
+
+   private volatile boolean running = true;
+
+   /**
+* Creates a new Kinesis Data Fetcher for the specified set of shards
+*
+* @param assignedShards the shards that this fetcher will pull data 
from
+* @param configProps the configuration properties of this Flink 
Kinesis Consumer
+* @param taskName the task name of this consumer task
+*/
+   public KinesisDataFetcher(List assignedShards, 
Properties configProps, String taskName) {
+   this.configProps = checkNotNull(configProps);
+   this.credentials = AWSUtil.getCredentialsProvider(configProps);
+   this.assignedShardsWithStartingSequenceNum = new HashMap<>();
+   for (KinesisStreamShard shard : assignedShards) {
+   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+   }
+   this.taskName = taskName;
+   this.error = new AtomicReference<>();
+   }
+
+   /**
+* Advance a 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253779#comment-15253779
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60726049
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I found out what I was doing wrong. The code was using the default region 
ID because I forgot to set it.
I'm currently fixing some issues in the consumer and I'll make the region a 
required argument.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251355#comment-15251355
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60531075
  
--- Diff: flink-streaming-connectors/pom.xml ---
@@ -45,6 +45,7 @@ under the License.
flink-connector-rabbitmq
flink-connector-twitter
flink-connector-nifi
+   flink-connector-kinesis
--- End diff --

Thanks, I missed the "include-kinesis" profile defined below. We'll 
probably need a more general profile name in the future though (ex. 
include-aws-connectors), for example when we start including more Amazon 
licensed libraries for other connectors such as for DynamoDB


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250214#comment-15250214
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60443897
  
--- Diff: flink-streaming-connectors/pom.xml ---
@@ -45,6 +45,7 @@ under the License.
flink-connector-rabbitmq
flink-connector-twitter
flink-connector-nifi
+   flink-connector-kinesis
--- End diff --

I think we have to remove this line again. The module is included in the 
profile below (you have to activate the "include-kinesis" maven build profile)


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250211#comment-15250211
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-212504465
  
Great, thank you. I'll review the PR soon.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249627#comment-15249627
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-212376531
  
Quick update:
1. Rebased and integrated the consumer code into the maven module that came 
with the producer merge.
2. Appended documentation for the consumer.
3. Moved the producer's KinesisSerializationSchema into 
org.apache.flink.streaming.connectors.kinesis.serialization package, where I 
originally placed deserialization related classes for the consumer.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249537#comment-15249537
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-212341465
  
The problem was that the github mirror needed some time to sync with the 
commit. But now its there.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249532#comment-15249532
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-212340127
  
ah ok :) see it now, thanks.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249503#comment-15249503
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-212333629
  
@tzulitai The producer wasn't merged yet.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249493#comment-15249493
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-212330070
  
@rmetzger Hi Robert,
I'm rebasing my PR, but I could not find the merged Kinesis producer / 
maven module in the current apache/flink master. Please correct me if I'm doing 
anything wrong. Thanks :)


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249454#comment-15249454
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-212310743
  
Quick update on our plan: I've merged the Kinesis producer. If you want, 
you can rebase this pull request on the current master.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249452#comment-15249452
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60364261
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I thought so too, but it didn't work for me.
I'll investigate the issue further ...


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249208#comment-15249208
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60346774
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I'm using the "ap-northeast-1" region, which isn't the default.
Setting the region on the AmazonKinesisClient should set the endpoint too, 
no?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247784#comment-15247784
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60235061
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I had to set the endpoint here as well to make it use.
Which AWS region were you using?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247512#comment-15247512
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-211847420
  
Cool. You don't need to resubmit a new PR. By pushing new commits to your 
`FLINK-3229` branch, the pull request will update automatically.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247509#comment-15247509
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-211847140
  
@rmetzger Sure, that seems reasonable. I'll wait until the producer is 
merged and resubmit a new PR for the integrated consumer.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247501#comment-15247501
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-211846349
  
Thank you for opening a pull request for the consumer.
How about we proceed like this:
- I'm trying to get the producer code merged within the next 24 hours (feel 
free to test it a bit if you want)
- In the meantime, I'm testing and reviewing your code
- Once the producer has been merged, we integrate the consumer code into 
the maven module / code structure from my producer code.
- I'll review the consumer again and we merge it ;)


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247492#comment-15247492
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/1911

[FLINK-3229] Flink streaming consumer for AWS Kinesis

I've been using this consumer for a while in off-production environments.
I understand we should have good test coverage for each PR, but since 
Kinesis is a hosted service, reliable integration tests are hard to pull off. 
To speed up merging Kinesis connector for the next release, I'm submitting the 
consumer now for some early reviews.
On the other hand, since @rmetzger is submitting a separate PR for Kinesis 
producer, I'd like to postpone writing more tests for the consumer, as well as 
corresponding modification to the document until both the consumer and producer 
are in place.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-3229

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1911


commit 0e7c4eccca132e0fcf35262b36229e681c77530e
Author: Gordon Tai 
Date:   2016-04-16T03:33:17Z

[FLINK-3229] Initial working version for FlinkKinesisConsumer.

commit 6d46a6d09c346e490053a2f6319616a5178dab4e
Author: Gordon Tai 
Date:   2016-04-17T09:34:55Z

[FLINK-3229] Change access level of `assignShards` and 
`validatePropertiesConfig` to protected for testing purposes

commit c92b491fce8b3c35b409bc6f308d25ce52835027
Author: Gordon Tai 
Date:   2016-04-17T11:12:00Z

[FLINK-3229] Fix coding stype violations regarding leading spaces in 
indentations

commit bc9f771c0f36618ce07772cfdefe7d87a35800fb
Author: Gordon Tai 
Date:   2016-04-19T09:56:57Z

[FLINK-3229] Change scope of flink-streaming-java module to provided

commit fc454efaac5dc7969ad4834b892f26799cfe5a33
Author: Gordon Tai 
Date:   2016-04-19T09:58:46Z

[FLINK-3229] Basic unit test for stable shard-to-consumer assignment




> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243991#comment-15243991
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3229:


(Duplicate comment from FLINK-3211. Posting it here also to keep the issue 
updated.)

https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.

> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties config = new Properties();
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, 
> "1000");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest");
> config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1");
> AWSCredentialsProvider credentials = // credentials API in AWS SDK
> DataStream kinesisRecords = env
> .addSource(new FlinkKinesisConsumer<>(
> listOfStreams, credentials, new SimpleStringSchema(), config
> ));
> {code}
> Currently still considering which read start positions to support 
> ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this 
> can be found in https://issues.apache.org/jira/browser/FLINK-3211.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-01-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15101887#comment-15101887
 ] 

Stephan Ewen commented on FLINK-3229:
-

As per discussion in FLINK-3211 , I think "TRIM_HORIZON" and "LATEST" are 
sufficient for now.

> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties config = new Properties();
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, 
> "1000");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest");
> config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1");
> AWSCredentialsProvider credentials = // credentials API in AWS SDK
> DataStream kinesisRecords = env
> .addSource(new FlinkKinesisConsumer<>(
> listOfStreams, credentials, new SimpleStringSchema(), config
> ));
> {code}
> Currently still considering which read start positions to support 
> ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this 
> can be found in https://issues.apache.org/jira/browser/FLINK-3211.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)