Hi, I think you will need to convert RDD to data frame using .toDF(), like accessLogs.toDF.registerTempTable("RAS")
Thanks, moon On Mon, Sep 7, 2015 at 3:34 AM Sajeevan Achuthan < achuthan.sajee...@gmail.com> wrote: > Zeppelin, an excellent tool. I am trying to implement a streaming > application. I get an error while deploying my application. See my code > below > > > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > import org.apache.spark.streaming.StreamingContext > import org.apache.spark.streaming.Seconds > import org.apache.spark.sql.SQLContext > val sparkConf = new > SparkConf().setAppName("PEPA").setMaster("local[*]").set("spark.driver.allowMultipleContexts", > "true") > > import org.apache.spark.streaming.kafka._ > val ssc = new StreamingContext(sparkConf, Seconds(2)) > > ssc.checkpoint("checkpoint") > val topicMap = Map("incoming"->1) > > val record = KafkaUtils.createStream(ssc, "localhost", "1", > topicMap).map(_._2) > record.print() > case class > CELL_KPIS(ECELL_Name:String,CGI:String,Number_of_Times_Interf:Double,TAOF:Double,PHL:Double,NPCCHL:Double,LRSRP:Double,NC:Double) > val data = > record.map(s=>s.split(",")).filter(s=>s(0)!="\"ECELL_Name\"").map( > s=>CELL_KPIS(s(0), s(1), s(2).toDouble, s(3).toDouble, > s(5).toDouble,s(6).toDouble, s(7).toDouble, s(8).toDouble) > ) > data.foreachRDD {accessLogs => > import sqlContext.implicits._ > accessLogs.registerTempTable("RAS") > > } > ssc.start() > ssc.awaitTermination() > > And I get error > import org.apache.spark.SparkContext import > org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import > org.apache.spark.streaming.StreamingContext import > org.apache.spark.streaming.Seconds import org.apache.spark.sql.SQLContext > sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2e5779a > import org.apache.spark.streaming.kafka._ ssc: > org.apache.spark.streaming.StreamingContext = > org.apache.spark.streaming.StreamingContext@48621ee1 topicMap: > scala.collection.immutable.Map[String,Int] = Map(incoming -> 1) record: > org.apache.spark.streaming.dstream.DStream[String] = > org.apache.spark.streaming.dstream.MappedDStream@6290e75e defined class > CELL_KPIS data: org.apache.spark.streaming.dstream.DStream[CELL_KPIS] = > org.apache.spark.streaming.dstream.MappedDStream@4bda38c3 > > <console>:55: error: value registerTempTable is not a member of > org.apache.spark.rdd.RDD[CELL_KPIS] accessLogs.registerTempTable("RAS") > > *My configuration for Zeppelin* > > > export MASTER=spark://localhost:7077 > export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_05 > export ZEPPELIN_PORT=9090 > export ZEPPELIN_SPARK_CONCURRENTSQL=false > export ZEPPELIN_SPARK_USEHIVECONTEXT=false > #'export MASTER=local[*] > export SPARK_HOME=/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4 > > *Interpreter configuration for spark * > > "2AW247KM7": { "id": "2AW247KM7", "name": "spark", "group": "spark", > "properties": { "spark.cores.max": "", "spark.yarn.jar": "", "master": > "local[*]", "zeppelin.spark.maxResult": "1000", "zeppelin.dep.localrepo": > "local-repo", "spark.app.name": "APP3", "spark.executor.memory": "5G", > "zeppelin.spark.useHiveContext": "false", > "spark.driver.allowMultipleContexts": "true", "args": "", "spark.home": > "/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4", > "zeppelin.spark.concurrentSQL": "true", "zeppelin.pyspark.python": "python" > }, "interpreterGroup": [ { "class": > "org.apache.zeppelin.spark.SparkInterpreter", "name": "spark" }, { "class": > "org.apache.zeppelin.spark.PySparkInterpreter", "name": "pyspark" }, { > "class": "org.apache.zeppelin.spark.SparkSqlInterpreter", "name": "sql" }, > { "class": "org.apache.zeppelin.spark.DepInterpreter", "name": "dep" } ], > "option": { "remote": true } } > Is there any problem in my code or setup ? > Any help very much appreciated. >