beliefer opened a new pull request, #37977:
URL: https://github.com/apache/spark/pull/37977

   ### What changes were proposed in this pull request?
   When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
   There is one test case could repeat this error.
   ```
   val namedObservation = Observation("named")
   val df = spark.range(1, 10, 1, 10)
   val observed_df = df.observe(
     namedObservation, percentile_approx($"id", lit(0.5), 
lit(100)).as("percentile_approx_val"))
   observed_df.collect()
   ```
   throws exception as follows:
   ```
   13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
   java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
        at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
        at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
        at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
        at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   ### Why are the changes needed?
   Fix a bug.
   After my investigation, The root cause is the buffer of 
`AggregatingAccumulator` will not be created if the input rows is empty.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'Yes'.
   Users will see the correct results.
   
   
   ### How was this patch tested?
   New test case.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to