Repository: flink
Updated Branches:
  refs/heads/master 1dee62b4b -> b6bfcf008


[FLINK-3270] Add Kafka example

This closes #1533


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6bfcf00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6bfcf00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6bfcf00

Branch: refs/heads/master
Commit: b6bfcf008e20eb4d4a3e81bedf7eaf871f121d4c
Parents: 1dee62b
Author: Robert Metzger <rmetz...@apache.org>
Authored: Thu Jan 21 11:42:12 2016 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Fri Feb 12 15:02:28 2016 +0100

----------------------------------------------------------------------
 flink-examples/flink-examples-streaming/pom.xml | 51 ++++++++++++++
 .../streaming/examples/kafka/ReadFromKafka.java | 63 +++++++++++++++++
 .../examples/kafka/WriteIntoKafka.java          | 72 ++++++++++++++++++++
 .../kafka/examples/ReadFromKafka.java           | 56 ---------------
 .../kafka/examples/WriteIntoKafka.java          | 70 -------------------
 5 files changed, 186 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml 
b/flink-examples/flink-examples-streaming/pom.xml
index ba49dc5..3ea3276 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -61,6 +61,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.10</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
@@ -522,6 +528,51 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
+
+                       <!-- Use the shade plugin to build a fat jar for the 
kafka example -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>fat-jar-kafka-example</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       
<shadedArtifactAttached>false</shadedArtifactAttached>
+                                                       
<createDependencyReducedPom>false</createDependencyReducedPom>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.examples.kafka.ReadFromKafka</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                                       
<finalName>Kafka</finalName>
+                                                       <!--- 
<outputFile>Kafka.jar</outputFile> -->
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*</artifact>
+                                                                       
<includes>
+                                                                               
<include>org/apache/flink/streaming/examples/kafka/**</include>
+                                                                               
<include>org/apache/flink/streaming/**</include>
+                                                                               
<include>org/apache/kafka/**</include>
+                                                                               
<include>org/apache/curator/**</include>
+                                                                               
<include>org/apache/zookeeper/**</include>
+                                                                               
<include>org/apache/jute/**</include>
+                                                                               
<include>org/I0Itec/**</include>
+                                                                               
<include>jline/**</include>
+                                                                               
<include>com/yammer/**</include>
+                                                                               
<include>kafka/**</include>
+                                                                       
</includes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
                </plugins>
                
                <pluginManagement>

http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
new file mode 100644
index 0000000..2179eca
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+
+/**
+ * Read Strings from Kafka and print them to standard out.
+ * Note: On a cluster, DataStream.print() will print to the TaskManager's .out 
file!
+ *
+ * Please pass the following arguments to run the example:
+ *     --topic test --bootstrap.servers localhost:9092 --group.id myconsumer
+ *
+ */
+public class ReadFromKafka {
+
+       public static void main(String[] args) throws Exception {
+               // parse input arguments
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if(parameterTool.getNumberOfParameters() < 3) {
+                       System.out.println("Missing parameters!\nUsage: Kafka 
--topic <topic> --bootstrap.servers <kafka brokers> --group.id <some id>");
+                       System.exit(1);
+               }
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.setNumberOfExecutionRetries(3); // retry if job fails
+               env.enableCheckpointing(5000); // create a checkpoint every 5 
secodns
+               env.getConfig().setGlobalJobParameters(parameterTool); // make 
parameters available in the web interface
+
+               DataStream<String> messageStream = env
+                               .addSource(new FlinkKafkaConsumer08<>(
+                                               
parameterTool.getRequired("topic"),
+                                               new SimpleStringSchema(),
+                                               parameterTool.getProperties()));
+
+               // write kafka stream to standard out.
+               messageStream.print();
+
+               env.execute("Read from Kafka example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
new file mode 100644
index 0000000..0a33265
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+
+/**
+ * Generate a String every 500 ms and write it into a Kafka topic
+ *
+ * Please pass the following arguments to run the example:
+ *     --topic test --bootstrap.servers localhost:9092
+ *
+ */
+public class WriteIntoKafka {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool parameterTool = ParameterTool.fromArgs(args);
+               if(parameterTool.getNumberOfParameters() < 2) {
+                       System.out.println("Missing parameters!\nUsage: Kafka 
--topic <topic> --bootstrap.servers <kafka brokers>");
+                       System.exit(1);
+               }
+
+               StreamExecutionEnvironment env 
=StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.setNumberOfExecutionRetries(3);
+
+               // very simple data generator
+               DataStream<String> messageStream = env.addSource(new 
SourceFunction<String>() {
+                       public boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<String> ctx) throws 
Exception {
+                               long i = 0;
+                               while(this.running) {
+                                       ctx.collect("Element - " + i++);
+                                       Thread.sleep(500);
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               // write data into Kafka
+               messageStream.addSink(new 
FlinkKafkaProducer08<>(parameterTool.getRequired("topic"), new 
SimpleStringSchema(), parameterTool.getProperties()));
+
+               env.execute("Write into Kafka example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
deleted file mode 100644
index 643da66..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.examples;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-
-/**
- * Read Strings from Kafka and print them to standard out.
- * Note: On a cluster, DataStream.print() will print to the TaskManager's .out 
file!
- *
- * Please pass the following arguments to run the example:
- *     --topic test --bootstrap.servers localhost:9092 --group.id myconsumer
- *
- */
-public class ReadFromKafka {
-
-       public static void main(String[] args) throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-               env.setNumberOfExecutionRetries(4);
-               env.enableCheckpointing(5000);
-               env.setParallelism(2);
-
-               ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-               DataStream<String> messageStream = env
-                               .addSource(new FlinkKafkaConsumer09<>(
-                                               
parameterTool.getRequired("topic"),
-                                               new SimpleStringSchema(),
-                                               parameterTool.getProperties()));
-
-               messageStream.print();
-
-               env.execute("Read from Kafka example");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
deleted file mode 100644
index fbe53fa..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.examples;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-
-/**
- * Generate a String every 500 ms and write it into a Kafka topic
- *
- * Please pass the following arguments to run the example:
- *     --topic test --bootstrap.servers localhost:9092
- *
- */
-public class WriteIntoKafka {
-
-       public static void main(String[] args) throws Exception {
-               StreamExecutionEnvironment env =
-                               
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-               env.setNumberOfExecutionRetries(4);
-               env.setParallelism(2);
-
-               ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-               // very simple data generator
-               DataStream<String> messageStream = env.addSource(new 
SourceFunction<String>() {
-                       public boolean running = true;
-
-                       @Override
-                       public void run(SourceContext<String> ctx) throws 
Exception {
-                               long i = 0;
-                               while(this.running) {
-                                       ctx.collect("Element - " + i++);
-                                       Thread.sleep(500);
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               });
-
-               // write data into Kafka
-               messageStream.addSink(new 
FlinkKafkaProducer09<>(parameterTool.getRequired("topic"), new 
SimpleStringSchema(), parameterTool.getProperties()));
-
-               env.execute("Write into Kafka example");
-       }
-}

Reply via email to