[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249528#comment-15249528
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1910


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249444#comment-15249444
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1910#issuecomment-212306378
  
Merging ...


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247760#comment-15247760
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60232443
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247692#comment-15247692
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60224497
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247679#comment-15247679
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1910#issuecomment-211906744
  
Thank you @zentol and @uce for the review.
I hope I addressed all your concerns.


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247647#comment-15247647
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60220590
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247649#comment-15247649
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60220641
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   see.setParallelism(1);
+
+   DataStream simpleStringStream = see.addSource(new 
EventsGenerator());
+
+   FlinkKinesisProducer kinesis = new 
FlinkKinesisProducer<>(pt.getRequired("region"),
+   pt.getRequired("accessKey"),
+   pt.getRequired("secretKey"), new 
SimpleStringSchema());
+
+   kinesis.setFailOnError(true);
+   kinesis.setDefaultStream("test-flink");
+   kinesis.setDefaultPartition("0");
+
+   simpleStringStream.addSink(kinesis);
+
+   see.execute();
+   }
+
+   public static class EventsGenerator implements SourceFunction {
+   private boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   long seq = 0;
+   while(running) {
+
--- End diff --

Its invisible code ;)


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247639#comment-15247639
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60219813
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247636#comment-15247636
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60219525
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

Its not related at all. I just came across it and fixed it.



> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247514#comment-15247514
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1910#issuecomment-211847621
  
Great addition to have a producer to Kinesis. The changes look good 
overall, I had some minor inline comments.

The main concern is that we can only test this with a real Kinesis setup, 
which makes it fragile for future changes etc. Did you consider trying to mock 
the `KinesisProducer` to test that the expected calls are issued to it?

PS: The title of the JIRA issue and this PR is confusing (the last `... 
into Flink` part)


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247511#comment-15247511
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207331
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

how is this change related to the Kinesis connector?


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247508#comment-15247508
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207207
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   see.setParallelism(1);
+
+   DataStream simpleStringStream = see.addSource(new 
EventsGenerator());
+
+   FlinkKinesisProducer kinesis = new 
FlinkKinesisProducer<>(pt.getRequired("region"),
+   pt.getRequired("accessKey"),
+   pt.getRequired("secretKey"), new 
SimpleStringSchema());
+
+   kinesis.setFailOnError(true);
+   kinesis.setDefaultStream("test-flink");
+   kinesis.setDefaultPartition("0");
+
+   simpleStringStream.addSink(kinesis);
+
+   see.execute();
+   }
+
+   public static class EventsGenerator implements SourceFunction {
+   private boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   long seq = 0;
+   while(running) {
+
--- End diff --

this empty line seems odd.


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247506#comment-15247506
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207039
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+ 

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247502#comment-15247502
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60206857
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247504#comment-15247504
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60206929
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -0,0 +1,92 @@
+---
+title: "Amazon AWS Kinesis Streams Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 5
+sub-nav-title: Amazon Kinesis Streams
+---
+
+
+The Kinesis connector allows to produce data into an [Amazon AWS Kinesis 
Stream](http://aws.amazon.com/kinesis/streams/).
--- End diff --

Should we add that there are no exactly-once guarantees?


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247500#comment-15247500
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60206533
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247489#comment-15247489
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60205881
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

OK


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247488#comment-15247488
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60205778
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247480#comment-15247480
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60205005
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
--- End diff --

this should be volatile, if the callback is executed by a different thread


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for 

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247476#comment-15247476
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60204861
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

I wanted to allow users passing a custom property using 
`-Dhbc-core.version=a.b.c`. I didn't change the version, just introduced the 
prop.


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247472#comment-15247472
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60204721
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

Why do we have this as a property now? Is it overwritten somewhere? 


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247463#comment-15247463
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60204401
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   see.setParallelism(1);
+
+   DataStream simpleStringStream = see.addSource(new 
EventsGenerator());
+
+   FlinkKinesisProducer kinesis = new 
FlinkKinesisProducer<>(pt.getRequired("region"),
--- End diff --

I would have each argument on its own line


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247461#comment-15247461
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60204277
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisSerializationSchema.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Kinesis-specific serialization schema, allowing users to specify a 
target stream based
+ * on a record's contents.
+ * @param 
+ */
+public interface KinesisSerializationSchema extends Serializable {
+   /**
+* Serialize the given element into a ByteBuffer
+*
+* @param element The element to serialize
+* @return Serialized representation of the element
+*/
+   ByteBuffer serialize(T element);
+
+   /**
+* Optional method to determine the target stream based on the element.
+* Return "null" to use the default stream
--- End diff --

Do you mean to return the String `"null"` or `null`?


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247457#comment-15247457
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60203894
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247449#comment-15247449
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60202840
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247440#comment-15247440
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60202074
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247437#comment-15247437
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/1910

[FLINK-3230] Add producer for Amazon Kinesis Streams into Flink

I've tested this connector against two kinesis streams. Since Kinesis is a 
hosted service, I can not implement a full integration test that runs with each 
test.
I'll run further tests once we've added a Kinesis consumer as well.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink3230

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1910.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1910


commit 0f6fc06b2dce5c5dbaae32ba0955417c5b029fbf
Author: Robert Metzger 
Date:   2016-04-11T14:48:54Z

[FLINK-3230] Add producer for Amazon Kinesis Streams into Flink




> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)