I can run below code in spark-shell using yarn client mode.

val csv = spark.read.option("header", "true").csv("my.csv")

def queryYahoo(row: Row) : Int = { return 10; }

csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
queryYahoo(r) })}

However, the same code failed when run using spark-submit in yarn client or
cluster mode due to error:

18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception:
org.apache.spark.util.ReturnStatementInClosureException: Return statements
aren't allowed in Spark closures

org.apache.spark.util.ReturnStatementInClosureException: Return statements
aren't allowed in Spark closures

at
org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:371)

at org.apache.xbean.asm5.ClassReader.a(Unknown Source)

at org.apache.xbean.asm5.ClassReader.b(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:243)

at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)

at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)

at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)

at scala.collection.immutable.List.foreach(List.scala:381)

at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:292)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)


Any idea? Thanks.

Reply via email to