GitHub user gatorsmile opened a pull request:

    https://github.com/apache/spark/pull/21086

    [SPARK-24002] [SQL] Task not serializable caused by 
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes

    ## What changes were proposed in this pull request?
    ```
    Py4JJavaError: An error occurred while calling o153.sql.
    : org.apache.spark.SparkException: Job aborted.
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
        at 
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
        at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:293)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:226)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Exception thrown in Future.get: 
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
        at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
        at 
org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
        at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
        at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
        at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
        at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
        at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
        ...
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
        ... 23 more
    Caused by: java.util.concurrent.ExecutionException: 
org.apache.spark.SparkException: Task not serializable
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:206)
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
        ... 276 more
    Caused by: org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
        at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:89)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:125)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:116)
        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:271)
        at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:181)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
        at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:61)
        at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:264)
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:93)
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:81)
        at 
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:150)
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:80)
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:76)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
        at java.nio.ByteBuffer.get(ByteBuffer.java:715)
        at 
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
        at 
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
        at 
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    ```
    
    The Parquet filters are serializable but not thread safe. 
SparkPlan.prepare() could be called in different threads (BroadcastExchange 
will call it in a thread pool). Thus, we could serialize the same Parquet 
filter at the same time. This is not easily reproduced. The fix is to avoid 
serializing these Parquet filters in the driver. This PR is to avoid 
serializing these Parquet filters by moving the parquet filter generation from 
the driver to executors. 
    
    ## How was this patch tested?
    Having two queries one is a 1000-line SQL query and a 3000-line SQL query. 
Need to run at least one hour with a heavy write workload to reproduce once. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gatorsmile/spark taskNotSerializable

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21086.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21086
    
----
commit 3bb4824225b53f0ee7900835bfc99b9bd01f7d4f
Author: gatorsmile <gatorsmile@...>
Date:   2018-04-17T15:46:12Z

    fix.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to