Repository: flink Updated Branches: refs/heads/master fa42cdabf -> 256c9c4da
[FLINK-4033] Polish up Kinesis connector documentation Includes: 1. Scala examples for consumer and producer 2. Add information about AWS Kinesis service usage 3. Add Kinesis connecter to the fault tolerance guarantees table 4. Minor typo fix in Kafka documentation This closes #2181 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256c9c4d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256c9c4d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256c9c4d Branch: refs/heads/master Commit: 256c9c4daf11349128eaa2e71a434f609e57053c Parents: fa42cda Author: Gordon Tai <tzuli...@gmail.com> Authored: Wed Jun 29 18:29:08 2016 +0800 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Jun 29 13:41:35 2016 +0200 ---------------------------------------------------------------------- docs/apis/streaming/connectors/kafka.md | 2 +- docs/apis/streaming/connectors/kinesis.md | 63 ++++++++++++++++++++++---- docs/apis/streaming/fault_tolerance.md | 16 +++++-- 3 files changed, 68 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md index 9bd70be..e7cd05b 100644 --- a/docs/apis/streaming/connectors/kafka.md +++ b/docs/apis/streaming/connectors/kafka.md @@ -236,7 +236,7 @@ properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); -val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new SimpleStringSchema(), properties); +val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); stream = env .addSource(myConsumer) http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md index 66c078a..db3a9c4 100644 --- a/docs/apis/streaming/connectors/kinesis.md +++ b/docs/apis/streaming/connectors/kinesis.md @@ -52,12 +52,16 @@ mvn clean install -Pinclude-kinesis -DskipTests Note that the streaming connectors are not part of the binary distribution. -See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). +See how to link with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). -#### Usage of Consumer +### Using the Amazon Kinesis Streams Service +Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) +to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams. + +### Kinesis Consumer The `FlinkKinesisConsumer` can be used to pull data from multiple Kinesis streams within the same AWS region in parallel. -It participates in Flink's distributed snapshot checkpointing and provides exactly-once processing guarantees. Note +It participates in Flink's distributed snapshot checkpointing and provides exactly-once user-defined state update guarantees. Note that the current version can not handle resharding of Kinesis streams. When Kinesis streams are resharded, the consumer will fail and the Flink streaming job must be resubmitted. @@ -78,10 +82,28 @@ kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYP StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); -DataStream<String> kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)) {% endhighlight %} </div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val kinesisConsumerConfig = new Properties(); +kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); +kinesisConsumerConfig.put( + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, + "aws_access_key_id_here"); +kinesisConsumerConfig.put( + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, + "aws_secret_key_here"); +kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); + +val env = StreamExecutionEnvironment.getEnvironment + +val kinesis = env.addSource(new FlinkKinesisConsumer[String]( + "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig)) +{% endhighlight %} +</div> </div> The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties` @@ -92,13 +114,15 @@ the AWS access key ID and secret key are directly supplied in the configuration from the newest position in the Kinesis stream (the other option will be setting `KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE` to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible). -#### Usage of Producer +Other optional configuration keys can be found in `KinesisConfigConstants`. + +### Kinesis Producer -The `FlinkKinesisProducer` is used for putting data from a Flink stream onto a Kinesis stream. Note that the producer is not participating in +The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. In case of a failure, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. -To put data onto a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. For the monitoring to work, the user accessing the stream needs access to the Cloud watch service. @@ -113,16 +137,37 @@ kinesisProducerConfig.put( kinesisProducerConfig.put( KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "aws_secret_key_here"); -FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), kinesisProducerConfig); +FlinkKinesisProducer<String> kinesis = + new FlinkKinesisProducer<>(new SimpleStringSchema(), kinesisProducerConfig); kinesis.setFailOnError(true); -kinesis.setDefaultStream("test-flink"); +kinesis.setDefaultStream("kinesis_stream_name"); kinesis.setDefaultPartition("0"); DataStream<String> simpleStringStream = ...; simpleStringStream.addSink(kinesis); {% endhighlight %} </div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val kinesisProducerConfig = new Properties(); +kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); +kinesisProducerConfig.put( + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, + "aws_access_key_id_here"); +kinesisProducerConfig.put( + KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, + "aws_secret_key_here"); + +val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, kinesisProducerConfig); +kinesis.setFailOnError(true); +kinesis.setDefaultStream("kinesis_stream_name"); +kinesis.setDefaultPartition("0"); + +val simpleStringStream = ...; +simpleStringStream.addSink(kinesis); +{% endhighlight %} +</div> </div> The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties` http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/fault_tolerance.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md index 8426f11..3edc65d 100644 --- a/docs/apis/streaming/fault_tolerance.md +++ b/docs/apis/streaming/fault_tolerance.md @@ -103,7 +103,7 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) ### Fault Tolerance Guarantees of Data Sources and Sinks Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the -snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but +snapshotting mechanism. This is currently guaranteed for the Kafka source and AWS Kinesis Streams source (and internal number generators), but not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources: <table class="table table-bordered"> @@ -121,6 +121,11 @@ not for other sources. The following table lists the state update guarantees of <td>Use the appropriate Kafka connector for your version</td> </tr> <tr> + <td>AWS Kinesis Streams</td> + <td>exactly once</td> + <td>Current version does not handle stream resharding</td> + </tr> + <tr> <td>RabbitMQ</td> <td>at most once (v 0.10) / exactly once (v 1.0) </td> <td></td> @@ -178,8 +183,13 @@ state updates) of Flink coupled with bundled sinks: </tr> <tr> <td>Cassandra sink</td> - <td>at-least-once / exactly-once</td> - <td>exactly-once only for idempotent updates</td> + <td>at least once / exactly once</td> + <td>exactly once only for idempotent updates</td> + </tr> + <tr> + <td>AWS Kinesis Streams</td> + <td>at least once</td> + <td></td> </tr> <tr> <td>File sinks</td>