Jeff, Thanks for your response. I see below error in the logs. You think it has to do anything with hiveContext ? Do I have to serialize it before using inside foreach ?
16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener threw an exception java.lang.NullPointerException at org.apache.spark.sql.execution.ui.SQLListener. onTaskEnd(SQLListener.scala:167) at org.apache.spark.scheduler.SparkListenerBus$class. onPostEvent(SparkListenerBus.scala:42) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent( LiveListenerBus.scala:31) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent( LiveListenerBus.scala:31) at org.apache.spark.util.ListenerBus$class.postToAll( ListenerBus.scala:55) at org.apache.spark.util.AsynchronousListenerBus.postToAll( AsynchronousListenerBus.scala:37) at org.apache.spark.util.AsynchronousListenerBus$$anon$ 1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp( AsynchronousListenerBus.scala:80) at org.apache.spark.util.AsynchronousListenerBus$$anon$ 1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply( AsynchronousListenerBus.scala:65) at org.apache.spark.util.AsynchronousListenerBus$$anon$ 1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply( AsynchronousListenerBus.scala:65) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.util.AsynchronousListenerBus$$anon$ 1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils. scala:1181) at org.apache.spark.util.AsynchronousListenerBus$$anon$ 1.run(AsynchronousListenerBus.scalnerBus.scala:63) Thanks, Ajay On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <zjf...@gmail.com> wrote: > > In your sample code, you can use hiveContext in the foreach as it is scala > List foreach operation which runs in driver side. But you cannot use > hiveContext in RDD.foreach > > > > Ajay Chander <itsche...@gmail.com>于2016年10月26日周三 上午11:28写道: > >> Hi Everyone, >> >> I was thinking if I can use hiveContext inside foreach like below, >> >> object Test { >> def main(args: Array[String]): Unit = { >> >> val conf = new SparkConf() >> val sc = new SparkContext(conf) >> val hiveContext = new HiveContext(sc) >> >> val dataElementsFile = args(0) >> val deDF = >> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache() >> >> def calculate(de: Row) { >> val dataElement = de.getAs[String]("DataElement").trim >> val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + >> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM >> TEST_DB.TEST_TABLE1 ") >> df1.write.insertInto("TEST_DB.TEST_TABLE1") >> } >> >> deDF.collect().foreach(calculate) >> } >> } >> >> >> I looked at >> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext >> and I see it is extending SqlContext which extends Logging with >> Serializable. >> >> Can anyone tell me if this is the right way to use it ? Thanks for your time. >> >> Regards, >> >> Ajay >> >>