Hi, I'm trying to get hold of use spark context from hive context or streamingcontext. I have 2 pieces of codes one in core spark one in spark streaming. plain spark with hive which gives me context. Spark streaming code with hive which prints null. plz help me figure out how to make this code work.
thanks in advance /////core spark which gives context import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Trail extends App { val conf = new SparkConf(false).setMaster("local[*]").setAppName("Spark Streamer").set("spark.logConf", "true").set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cleaner.ttl", "300") val context = new SparkContext(conf) val hiveContext = new HiveContext(context) import com.dgm.Trail.hiveContext._ context textFile "logs/log1.txt" flatMap { data => val Array(id, signals) = data split '|' signals split '&' map { signal => val Array(key, value) = signal split '=' Signal(id, key, value) } } registerTempTable "signals" hiveContext cacheTable "signals" val signalRows = hiveContext sql "select id from signals where key='id' value='123'" map rts cache() signalRows.foreach { x => println(signalRows.context) } } ///// spark streaming code which prints null import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{ Seconds, StreamingContext } object Trail extends App { val conf = new SparkConf(false).setMaster("local[*]").setAppName("Spark Streamer").set("spark.logConf", "true").set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cleaner.ttl", "300") val streamingContext = new StreamingContext(conf, Seconds(10)) val context = streamingContext.sparkContext val kafkaParams = Map( "zookeeper.connect" -> "localhost", "group.id" -> "spark_stream", "zookeeper.connection.timeout.ms" -> "10000", "auto.offset.reset" -> "smallest" ) val stream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](streamingContext, kafkaParams, Map("tracker" -> 2), StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) val signalsDStream = stream flatMap { data => val Array(id, signals) = data split '|' signals split '&' map { signal => val Array(key, value) = signal split '=' Signal(id, key, value) } } signalsDStream foreachRDD { rdds => val hiveContext = new HiveContext(streamingContext.sparkContext) import hiveContext._ rdds registerTempTable "signals" hiveContext cacheTable "signals" val signalRows = hiveContext sql "select id from signals where key='id' and value='123'" map rts cache () signalRows.foreach { x => //println(streamingContext.sparkContext) causes serialization error println(hiveContext.sparkContext) } } streamingContext.start() } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-retrive-spark-context-when-hiveContext-is-used-in-sparkstreaming-tp17609.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org