Thanks Cody. Can I use Receiver-based Approach here?
I have created the topic newtopic as below ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181 --replication-factor 1 --partitions 1 --topic newtopic This is basically what I am doing the Spark val lines = ssc.socketTextStream("rhes564", 2181) Which obviously not working This is what is suggested in the doc import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) * <zkQuorum> is a list of one or more zookeeper servers that make quorum * <group> is the name of kafka consumer group * <topics> is a list of one or more kafka topics to consume from * <numThreads> is the number of threads the kafka consumer should use Now this comes back with error. onviously not passing parameters correctly! scala> val kafkaStream = KafkaUtils.createStream(streamingContext, rhes564:2181, rhes564:9092, newtopic 1) <console>:1: error: identifier expected but integer literal found. val kafkaStream = KafkaUtils.createStream(streamingContext, rhes564:2181, rhes564:9092, newtopic 1) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 1 April 2016 at 21:13, Cody Koeninger <c...@koeninger.org> wrote: > It looks like you're using a plain socket stream to connect to a > zookeeper port, which won't work. > > Look at spark.apache.org/docs/latest/streaming-kafka-integration.html > > On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh > <mich.talebza...@gmail.com> wrote: > > > > Hi, > > > > I am just testing Spark streaming with Kafka. > > > > Basically I am broadcasting topic every minute to Host:port -> > rhes564:2181. > > This is sending few lines through a shell script as follows: > > > > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh > --broker-list > > rhes564:9092 --topic newtopic > > > > That works fine and I can see the messages in > > > > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 > --topic > > newtopic > > > > Fri Apr 1 21:00:01 BST 2016 ======= Sending messages from rhes5 > > > 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101 > > > 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102 > > > 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103 > > > 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104 > > > 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105 > > > > Now I try to see the topic in spark streaming as follows: > > > > val conf = new SparkConf(). > > setAppName("StreamTest"). > > setMaster("local[12]"). > > set("spark.driver.allowMultipleContexts", "true"). > > set("spark.hadoop.validateOutputSpecs", "false") > > val sc = new SparkContext(conf) > > // Create sqlContext based on HiveContext > > val sqlContext = new HiveContext(sc) > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > > // > > // Create a local StreamingContext with two working thread and batch > > interval of 1 second. > > // The master requires 2 cores to prevent from a starvation scenario. > > val ssc = new StreamingContext(conf, Minutes(1)) > > // Create a DStream that will connect to hostname:port, like > localhost:9999 > > //val lines = ssc.socketTextStream("rhes564", 9092) > > val lines = ssc.socketTextStream("rhes564", 2181) > > // Split each line into words > > val words = lines.flatMap(_.split(" ")) > > val pairs = words.map(word => (word, 1)) > > val wordCounts = pairs.reduceByKey(_ + _) > > // Print the first ten elements of each RDD generated in this DStream to > the > > console > > wordCounts.print() > > ssc.start() > > > > This is what I am getting: > > > > > > scala> ------------------------------------------- > > Time: 1459541760000 ms > > ------------------------------------------- > > > > But no values > > > > Have I got the port wrong in this case or the set up is incorrect? > > > > > > Thanks > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > >