Repository: flink
Updated Branches:
  refs/heads/master fa42cdabf -> 256c9c4da


[FLINK-4033] Polish up Kinesis connector documentation

Includes:
1. Scala examples for consumer and producer
2. Add information about AWS Kinesis service usage
3. Add Kinesis connecter to the fault tolerance guarantees table
4. Minor typo fix in Kafka documentation

This closes #2181


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256c9c4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256c9c4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256c9c4d

Branch: refs/heads/master
Commit: 256c9c4daf11349128eaa2e71a434f609e57053c
Parents: fa42cda
Author: Gordon Tai <tzuli...@gmail.com>
Authored: Wed Jun 29 18:29:08 2016 +0800
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Jun 29 13:41:35 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kafka.md   |  2 +-
 docs/apis/streaming/connectors/kinesis.md | 63 ++++++++++++++++++++++----
 docs/apis/streaming/fault_tolerance.md    | 16 +++++--
 3 files changed, 68 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md 
b/docs/apis/streaming/connectors/kafka.md
index 9bd70be..e7cd05b 100644
--- a/docs/apis/streaming/connectors/kafka.md
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -236,7 +236,7 @@ properties.setProperty("bootstrap.servers", 
"localhost:9092");
 properties.setProperty("zookeeper.connect", "localhost:2181");
 properties.setProperty("group.id", "test");
 
-val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new 
SimpleStringSchema(), properties);
+val myConsumer = new FlinkKafkaConsumer08[String]("topic", new 
SimpleStringSchema(), properties);
 myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
 stream = env
     .addSource(myConsumer)

http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kinesis.md 
b/docs/apis/streaming/connectors/kinesis.md
index 66c078a..db3a9c4 100644
--- a/docs/apis/streaming/connectors/kinesis.md
+++ b/docs/apis/streaming/connectors/kinesis.md
@@ -52,12 +52,16 @@ mvn clean install -Pinclude-kinesis -DskipTests
 
 
 Note that the streaming connectors are not part of the binary distribution. 
-See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+See how to link with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-#### Usage of Consumer
+### Using the Amazon Kinesis Streams Service
+Follow the instructions from the [Amazon Kinesis Streams Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
+to setup Kinesis streams. Make sure to create the appropriate IAM policy and 
user to read / write to the Kinesis streams.
+
+### Kinesis Consumer
 
 The `FlinkKinesisConsumer` can be used to pull data from multiple Kinesis 
streams within the same AWS region in parallel.
-It participates in Flink's distributed snapshot checkpointing and provides 
exactly-once processing guarantees. Note
+It participates in Flink's distributed snapshot checkpointing and provides 
exactly-once user-defined state update guarantees. Note
 that the current version can not handle resharding of Kinesis streams. When 
Kinesis streams are resharded, the consumer
 will fail and the Flink streaming job must be resubmitted.
 
@@ -78,10 +82,28 @@ 
kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYP
 
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
 
-DataStream<String> kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
     "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig))
 {% endhighlight %}
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val kinesisConsumerConfig = new Properties();
+kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, 
"us-east-1");
+kinesisConsumerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
+    "aws_access_key_id_here");
+kinesisConsumerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
+    "aws_secret_key_here");
+kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE,
 "LATEST");
+
+val env = StreamExecutionEnvironment.getEnvironment
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
+{% endhighlight %}
+</div>
 </div>
 
 The above is a simple example of using the consumer. Configuration for the 
consumer is supplied with a `java.util.Properties`
@@ -92,13 +114,15 @@ the AWS access key ID and secret key are directly supplied 
in the configuration
 from the newest position in the Kinesis stream (the other option will be 
setting `KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE`
 to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream 
from the earliest record possible).
 
-#### Usage of Producer
+Other optional configuration keys can be found in `KinesisConfigConstants`.
+
+### Kinesis Producer
 
-The `FlinkKinesisProducer` is used for putting data from a Flink stream onto a 
Kinesis stream. Note that the producer is not participating in 
+The `FlinkKinesisProducer` is used for putting data from a Flink stream into a 
Kinesis stream. Note that the producer is not participating in
 Flink's checkpointing and doesn't provide exactly-once processing guarantees. 
In case of a failure, data will be written again
 to Kinesis, leading to duplicates. This behavior is usually called 
"at-least-once" semantics.
 
-To put data onto a Kinesis stream, make sure the stream is marked as "ACTIVE" 
in the AWS dashboard.
+To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" 
in the AWS dashboard.
 
 For the monitoring to work, the user accessing the stream needs access to the 
Cloud watch service.
 
@@ -113,16 +137,37 @@ kinesisProducerConfig.put(
 kinesisProducerConfig.put(
     KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
     "aws_secret_key_here");
-FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), kinesisProducerConfig);
 
+FlinkKinesisProducer<String> kinesis =
+    new FlinkKinesisProducer<>(new SimpleStringSchema(), 
kinesisProducerConfig);
 kinesis.setFailOnError(true);
-kinesis.setDefaultStream("test-flink");
+kinesis.setDefaultStream("kinesis_stream_name");
 kinesis.setDefaultPartition("0");
 
 DataStream<String> simpleStringStream = ...;
 simpleStringStream.addSink(kinesis);
 {% endhighlight %}
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val kinesisProducerConfig = new Properties();
+kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, 
"us-east-1");
+kinesisProducerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
+    "aws_access_key_id_here");
+kinesisProducerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
+    "aws_secret_key_here");
+
+val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, 
kinesisProducerConfig);
+kinesis.setFailOnError(true);
+kinesis.setDefaultStream("kinesis_stream_name");
+kinesis.setDefaultPartition("0");
+
+val simpleStringStream = ...;
+simpleStringStream.addSink(kinesis);
+{% endhighlight %}
+</div>
 </div>
 
 The above is a simple example of using the producer. Configuration for the 
producer with the mandatory configuration values is supplied with a 
`java.util.Properties`

http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md 
b/docs/apis/streaming/fault_tolerance.md
index 8426f11..3edc65d 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -103,7 +103,7 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 ### Fault Tolerance Guarantees of Data Sources and Sinks
 
 Flink can guarantee exactly-once state updates to user-defined state only when 
the source participates in the
-snapshotting mechanism. This is currently guaranteed for the Kafka source (and 
internal number generators), but
+snapshotting mechanism. This is currently guaranteed for the Kafka source and 
AWS Kinesis Streams source (and internal number generators), but
 not for other sources. The following table lists the state update guarantees 
of Flink coupled with the bundled sources:
 
 <table class="table table-bordered">
@@ -121,6 +121,11 @@ not for other sources. The following table lists the state 
update guarantees of
             <td>Use the appropriate Kafka connector for your version</td>
         </tr>
         <tr>
+            <td>AWS Kinesis Streams</td>
+            <td>exactly once</td>
+            <td>Current version does not handle stream resharding</td>
+        </tr>
+        <tr>
             <td>RabbitMQ</td>
             <td>at most once (v 0.10) / exactly once (v 1.0) </td>
             <td></td>
@@ -178,8 +183,13 @@ state updates) of Flink coupled with bundled sinks:
     </tr>
     <tr>
         <td>Cassandra sink</td>
-        <td>at-least-once / exactly-once</td>
-        <td>exactly-once only for idempotent updates</td>
+        <td>at least once / exactly once</td>
+        <td>exactly once only for idempotent updates</td>
+    </tr>
+    <tr>
+        <td>AWS Kinesis Streams</td>
+        <td>at least once</td>
+        <td></td>
     </tr>
     <tr>
         <td>File sinks</td>

Reply via email to