I nailed it down to a union operation, here's my code snippet:

    val properties: RDD[((String, String, String), Externalizer[KeyValue])]
= vertices.map { ve =>
      val (vertices, dsName) = ve
      val rval = GraphConfig.getRval(datasetConf, Constants.VERTICES,
dsName)
      val (_, rvalAsc, rvalType) = rval

      println(s"Table name: $dsName, Rval: $rval")
      println(vertices.toDebugString)

      vertices.map { v =>
        val rk = appendHash(boxId(v.id)).getBytes
        val cf = PROP_BYTES
        val cq = boxRval(v.rval, rvalAsc, rvalType).getBytes
        val value = Serializer.serialize(v.properties)

        ((new String(rk), new String(cf), new String(cq)),
         Externalizer(put(rk, cf, cq, value)))
      }
    }.reduce(_.union(_)).sortByKey(numPartitions = 32)

Basically I read data from multiple tables (Seq[RDD[(key, value)]]) and
they're transformed to the a KeyValue to be insert in HBase, so I need to
do a .reduce(_.union(_)) to combine them into one RDD[(key, value)].

I cannot see what's wrong in my code.

Jianshi



On Fri, Jul 25, 2014 at 12:24 PM, Jianshi Huang <jianshi.hu...@gmail.com>
wrote:

> I can successfully run my code in local mode using spark-submit (--master
> local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode.
>
> Any hints what is the problem? Is it a closure serialization problem? How
> can I debug it? Your answers would be very helpful.
>
> 14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ExceptionInInitializerError
> java.lang.ExceptionInInitializerError
>         at
> com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
> a:40)
>         at
> com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
> a:36)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016)
>         at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
>         at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
>         at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
>         at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to