Jeff, Thanks for your response. I see below error in the logs. You think it
has to do anything with hiveContext ? Do I have to serialize it before
using inside foreach ?
16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
threw an exception
java.lang.NullPointerException
at org.apache.spark.sql.execution.ui.SQLListener.
onTaskEnd(SQLListener.scala:167)
at org.apache.spark.scheduler.SparkListenerBus$class.
onPostEvent(SparkListenerBus.scala:42)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(
LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(
LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(
ListenerBus.scala:55)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(
AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(
AsynchronousListenerBus.scala:80)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(
AsynchronousListenerBus.scala:65)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(
AsynchronousListenerBus.scala:65)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.
scala:1181)
at org.apache.spark.util.AsynchronousListenerBus$$anon$
1.run(AsynchronousListenerBus.scalnerBus.scala:63)
Thanks,
Ajay
On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <[email protected]> wrote:
>
> In your sample code, you can use hiveContext in the foreach as it is scala
> List foreach operation which runs in driver side. But you cannot use
> hiveContext in RDD.foreach
>
>
>
> Ajay Chander <[email protected]>于2016年10月26日周三 上午11:28写道:
>
>> Hi Everyone,
>>
>> I was thinking if I can use hiveContext inside foreach like below,
>>
>> object Test {
>> def main(args: Array[String]): Unit = {
>>
>> val conf = new SparkConf()
>> val sc = new SparkContext(conf)
>> val hiveContext = new HiveContext(sc)
>>
>> val dataElementsFile = args(0)
>> val deDF =
>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>
>> def calculate(de: Row) {
>> val dataElement = de.getAs[String]("DataElement").trim
>> val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" +
>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM
>> TEST_DB.TEST_TABLE1 ")
>> df1.write.insertInto("TEST_DB.TEST_TABLE1")
>> }
>>
>> deDF.collect().foreach(calculate)
>> }
>> }
>>
>>
>> I looked at
>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>> and I see it is extending SqlContext which extends Logging with
>> Serializable.
>>
>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>
>> Regards,
>>
>> Ajay
>>
>>