Hi,

I'm trying to get hold of use spark context from hive context or
streamingcontext. I have 2 pieces of codes one in core spark one in spark
streaming. plain spark with hive which gives me context. Spark streaming
code with hive which prints null. plz help me figure out how to make this
code work.

thanks in advance

/////core spark which gives context
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object Trail extends App {
  val conf = new SparkConf(false).setMaster("local[*]").setAppName("Spark
Streamer").set("spark.logConf",
"true").set("spark.cassandra.connection.host",
"127.0.0.1").set("spark.cleaner.ttl", "300")

  val context = new SparkContext(conf)

  val hiveContext = new HiveContext(context)

  import com.dgm.Trail.hiveContext._

  context textFile "logs/log1.txt" flatMap { data =>
    val Array(id, signals) = data split '|'
    signals split '&' map { signal =>
      val Array(key, value) = signal split '='
      Signal(id, key, value)
    }
  } registerTempTable "signals"

  hiveContext cacheTable "signals"

  val signalRows = hiveContext sql "select id from signals where key='id'
value='123'" map rts cache()

  signalRows.foreach { x =>
    println(signalRows.context)
  }

}


///// spark streaming code which prints null
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{ Seconds, StreamingContext }

object Trail extends App {
  val conf = new SparkConf(false).setMaster("local[*]").setAppName("Spark
Streamer").set("spark.logConf",
"true").set("spark.cassandra.connection.host",
"127.0.0.1").set("spark.cleaner.ttl", "300")

  val streamingContext = new StreamingContext(conf, Seconds(10))

  val context = streamingContext.sparkContext

  val kafkaParams = Map(
    "zookeeper.connect" -> "localhost",
    "group.id" -> "spark_stream",
    "zookeeper.connection.timeout.ms" -> "10000",
    "auto.offset.reset" -> "smallest"
  )

  val stream = KafkaUtils.createStream[String, String,
kafka.serializer.StringDecoder,
kafka.serializer.StringDecoder](streamingContext, kafkaParams, Map("tracker"
-> 2), StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

  val signalsDStream = stream flatMap { data =>
    val Array(id, signals) = data split '|'
    signals split '&' map { signal =>
      val Array(key, value) = signal split '='
      Signal(id, key, value)
    }
  }

  signalsDStream foreachRDD { rdds =>
    val hiveContext = new HiveContext(streamingContext.sparkContext)
    import hiveContext._
    rdds registerTempTable "signals"
    hiveContext cacheTable "signals"
    val signalRows = hiveContext sql "select id from signals where key='id'
and value='123'" map rts cache ()
    signalRows.foreach { x =>
      //println(streamingContext.sparkContext) causes serialization error
      println(hiveContext.sparkContext)
    }
  }

  streamingContext.start()

}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-retrive-spark-context-when-hiveContext-is-used-in-sparkstreaming-tp17609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to