Huw created SPARK-45176:
---------------------------

             Summary: AggregatingAccumulator with TypedImperativeAggregate 
throwing ClassCastException
                 Key: SPARK-45176
                 URL: https://issues.apache.org/jira/browse/SPARK-45176
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.1, 3.4.0
            Reporter: Huw


Probably related to SPARK-39044. But potentially also this comment in 
Executor.scala.


// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, 
metricPeaks)

The class cast exception I'm seeing is
java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir
But I've seen it with other aggregation buffers like QuantileSummaries as well.

It's my belief that
withBufferSerialized()
for the Aggregating Accumulator is being called twice, leading to on
serializeAggregateBuffernPlace(buffer)
also being called twice for the an Imperative aggregate, the second time round, 
the buffer is already a byte array and the asInstanceOf[T] in getBufferObject 
is throwing.

This doesn't appear to happen on all runs, and it might be its only occurring 
when there's a transitive exception. I have a further suspicion that the cause 
might originate with
SerializationDebugger.improveException
which is traversing the task and forcing writeExternal, to be called.

Setting
|spark.serializer.extraDebugInfo|false|

Seems to make things a bit more reliable (I haven't seen the error while this 
setting is on), and points strongly in that direction.

Stack trace:
Job aborted due to stage failure: Authorized committer (attemptNumber=0, 
stage=15, partition=10) failed; but task commit success, data duplication may 
happen. 
reason=ExceptionFailure(java.io.IOException,java.lang.ClassCastException: class 
[B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 
'app'),[Ljava.lang.StackTraceElement;@7fe2f462,java.io.IOException: 
java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 'app')
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1502)
        at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:59)
        at java.base/java.io.ObjectOutputStream.writeExternalData(Unknown 
Source)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown 
Source)
        at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
        at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.SerializerHelper$.serializeToChunkedBuffer(SerializerHelper.scala:42)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: class [B cannot be cast to class 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir ([B is in module 
java.base of loader 'bootstrap'; 
org.apache.spark.sql.catalyst.expressions.aggregate.Reservoir is in unnamed 
module of loader 'app')
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.ReservoirSample.serialize(ReservoirSample.scala:33)
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:206)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
        at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
        at jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeWriteReplace(Unknown 
Source)
        at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
        at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:62)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:62)
        at scala.collection.immutable.Vector.foreach(Vector.scala:1856)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:62)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1495)
        ... 11 more
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to