[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249528#comment-15249528 ] ASF GitHub Bot commented on FLINK-3230: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1910 > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > Fix For: 1.1.0 > > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249444#comment-15249444 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1910#issuecomment-212306378 Merging ... > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247760#comment-15247760 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60232443 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247692#comment-15247692 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60224497 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247679#comment-15247679 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1910#issuecomment-211906744 Thank you @zentol and @uce for the review. I hope I addressed all your concerns. > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247647#comment-15247647 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60220590 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247649#comment-15247649 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60220641 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.examples; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; +import org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import java.nio.ByteBuffer; + +/** + * This is an example on how to produce data into Kinesis + */ +public class ProduceIntoKinesis { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + DataStream simpleStringStream = see.addSource(new EventsGenerator()); + + FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(pt.getRequired("region"), + pt.getRequired("accessKey"), + pt.getRequired("secretKey"), new SimpleStringSchema()); + + kinesis.setFailOnError(true); + kinesis.setDefaultStream("test-flink"); + kinesis.setDefaultPartition("0"); + + simpleStringStream.addSink(kinesis); + + see.execute(); + } + + public static class EventsGenerator implements SourceFunction { + private boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + long seq = 0; + while(running) { + --- End diff -- Its invisible code ;) > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247639#comment-15247639 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60219813 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247636#comment-15247636 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60219525 --- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml --- @@ -35,6 +35,11 @@ under the License. jar + + 2.2.0 --- End diff -- Its not related at all. I just came across it and fixed it. > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247514#comment-15247514 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1910#issuecomment-211847621 Great addition to have a producer to Kinesis. The changes look good overall, I had some minor inline comments. The main concern is that we can only test this with a real Kinesis setup, which makes it fragile for future changes etc. Did you consider trying to mock the `KinesisProducer` to test that the expected calls are issued to it? PS: The title of the JIRA issue and this PR is confusing (the last `... into Flink` part) > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247511#comment-15247511 ] ASF GitHub Bot commented on FLINK-3230: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60207331 --- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml --- @@ -35,6 +35,11 @@ under the License. jar + + 2.2.0 --- End diff -- how is this change related to the Kinesis connector? > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247508#comment-15247508 ] ASF GitHub Bot commented on FLINK-3230: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60207207 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.examples; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; +import org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import java.nio.ByteBuffer; + +/** + * This is an example on how to produce data into Kinesis + */ +public class ProduceIntoKinesis { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + DataStream simpleStringStream = see.addSource(new EventsGenerator()); + + FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(pt.getRequired("region"), + pt.getRequired("accessKey"), + pt.getRequired("secretKey"), new SimpleStringSchema()); + + kinesis.setFailOnError(true); + kinesis.setDefaultStream("test-flink"); + kinesis.setDefaultPartition("0"); + + simpleStringStream.addSink(kinesis); + + see.execute(); + } + + public static class EventsGenerator implements SourceFunction { + private boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + long seq = 0; + while(running) { + --- End diff -- this empty line seems odd. > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247506#comment-15247506 ] ASF GitHub Bot commented on FLINK-3230: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60207039 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247502#comment-15247502 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60206857 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247504#comment-15247504 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60206929 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -0,0 +1,92 @@ +--- +title: "Amazon AWS Kinesis Streams Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 5 +sub-nav-title: Amazon Kinesis Streams +--- + + +The Kinesis connector allows to produce data into an [Amazon AWS Kinesis Stream](http://aws.amazon.com/kinesis/streams/). --- End diff -- Should we add that there are no exactly-once guarantees? > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247500#comment-15247500 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60206533 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247489#comment-15247489 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60205881 --- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml --- @@ -35,6 +35,11 @@ under the License. jar + + 2.2.0 --- End diff -- OK > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247488#comment-15247488 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60205778 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247480#comment-15247480 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60205005 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; --- End diff -- this should be volatile, if the callback is executed by a different thread > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247476#comment-15247476 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60204861 --- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml --- @@ -35,6 +35,11 @@ under the License. jar + + 2.2.0 --- End diff -- I wanted to allow users passing a custom property using `-Dhbc-core.version=a.b.c`. I didn't change the version, just introduced the prop. > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247472#comment-15247472 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60204721 --- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml --- @@ -35,6 +35,11 @@ under the License. jar + + 2.2.0 --- End diff -- Why do we have this as a property now? Is it overwritten somewhere? > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247463#comment-15247463 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60204401 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.examples; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; +import org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import java.nio.ByteBuffer; + +/** + * This is an example on how to produce data into Kinesis + */ +public class ProduceIntoKinesis { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + DataStream simpleStringStream = see.addSource(new EventsGenerator()); + + FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(pt.getRequired("region"), --- End diff -- I would have each argument on its own line > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247461#comment-15247461 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60204277 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisSerializationSchema.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * Kinesis-specific serialization schema, allowing users to specify a target stream based + * on a record's contents. + * @param + */ +public interface KinesisSerializationSchema extends Serializable { + /** +* Serialize the given element into a ByteBuffer +* +* @param element The element to serialize +* @return Serialized representation of the element +*/ + ByteBuffer serialize(T element); + + /** +* Optional method to determine the target stream based on the element. +* Return "null" to use the default stream --- End diff -- Do you mean to return the String `"null"` or `null`? > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247457#comment-15247457 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60203894 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247449#comment-15247449 ] ASF GitHub Bot commented on FLINK-3230: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60202840 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247440#comment-15247440 ] ASF GitHub Bot commented on FLINK-3230: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60202074 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --- Runtime fields --- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --- Initialization and configuration --- + + + /** +* Create a new FlinkKinesisProducer. +* This is a constructor supporting Flink's {@see SerializationSchema}. +* +* @param region AWS region of the stream +* @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) +* @param secretKey Secret key of the user +* @param schema Serialization schema for the data type +*/ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema +
[jira] [Commented] (FLINK-3230) Kinesis streaming producer
[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247437#comment-15247437 ] ASF GitHub Bot commented on FLINK-3230: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/1910 [FLINK-3230] Add producer for Amazon Kinesis Streams into Flink I've tested this connector against two kinesis streams. Since Kinesis is a hosted service, I can not implement a full integration test that runs with each test. I'll run further tests once we've added a Kinesis consumer as well. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3230 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1910.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1910 commit 0f6fc06b2dce5c5dbaae32ba0955417c5b029fbf Author: Robert MetzgerDate: 2016-04-11T14:48:54Z [FLINK-3230] Add producer for Amazon Kinesis Streams into Flink > Kinesis streaming producer > -- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)