i am very new to *Spark streaming* and i am implementing small exercise
like sending *XML* data from *kafka* and need to receive that *streaming* data
through *spark streaming.* I tried in all possible ways.. but every time i
am getting *empty values.*

*There is no problem in Kafka side, only problem is receiving the Streaming
data from Spark side.Here is the code how i am implementing:package
com.package; import org.apache.spark.SparkConf; import
org.apache.spark.api.java.JavaSparkContext; import
org.apache.spark.streaming.Duration; import
org.apache.spark.streaming.api.java.JavaStreamingContext; public class
SparkStringConsumer { public static void main(String[] args) { SparkConf
conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc
= new JavaStreamingContext(sc, new Duration(2000)); Map<String, String>
kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list",
"localhost:9092"); Set<String> topics = Collections.singleton("mytopic");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with
" + rdd.partitions().size() + " partitions and " + rdd.count() + "
records"); rdd.foreach(record -> System.out.println(record._2)); });
ssc.start(); ssc.awaitTermination(); } } And i am using following
versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***

Reply via email to