Hi,
I'am new to Spark-Streaming and want to run some end-to-end-tests with Spark and Kafka. My program is running but at the kafka topic nothing arrives. Can someone please help me? Where is my mistake, has someone a runnig example of writing a DStream to Kafka 0.10.1.0? The program looks like follows: import kafka.Kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.*; import org.apache.spark.rdd.RDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Int; import scala.Tuple2; import java.util.*; import java.util.regex.Pattern; /** * Consumes messages from one or more topics in Kafka and does wordcount. * * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> * <zkQuorum> is a list of one or more zookeeper servers that make quorum * <group> is the name of kafka consumer group * <topics> is a list of one or more kafka topics to consume from * <numThreads> is the number of threads the kafka consumer should use * * To run this example: * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ * zoo03 my-consumer-group topic1,topic2 1` */ public final class JavaKafkaWordCountTest { private static final Pattern SPACE = Pattern.compile(" "); private JavaKafkaWordCountTest() { } public static void main(String[] args) throws Exception { if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("GutweinKafkaWordCount"); // Create the context with 2 seconds batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); Map<String, Integer> topicMap = new HashMap<>(); String[] topics = args[2].split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } final JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); final KafkaWriter writer = new KafkaWriter("localhost:9081"); wordCounts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { @Override public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception { writer.writeToTopic("output", stringIntegerJavaPairRDD.toString()); } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); } public static class KafkaWriter { Properties props = new Properties(); KafkaProducer<String, String> producer; // Constructor KafkaWriter(String bootstrap_server){ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(props); } private void writeToTopic(String topicName, String message){ ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message); producer.send(record); } } }