LuciferYang commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1202132810


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -197,4 +197,22 @@ class UserDefinedFunctionE2ETestSuite extends 
RemoteSparkSession {
     spark.range(10).repartition(1).foreachPartition(func)
     assert(sum.get() == 0) // The value is not 45
   }
+
+  test("Dataset reduce") {

Review Comment:
   @zhenlineo @hvanhovell @amaliujia I found an interesting thing about these 
two new cases, 'SimpleSparkConnectService' will be submitted as `local [*]`, 
and when the number of cores on the machine running the case is greater than 
10, these two cases will failed as follows:
   
   ```
   Warning: Unable to serialize throwable of type 
io.grpc.StatusRuntimeException for TestFailed(Ordinal(0, 271),INTERNAL: Job 
aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent 
failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): 
java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
        at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
        at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
        at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
        at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
        at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at 
org.apache.spark.scheduler.Task.run(Task.sca...,UserDefinedFunctionE2ETestSuite,org.apache.spark.sql.UserDefinedFunctionE2ETestSuite,Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),Dataset
 reduce,Dataset reduce,Vector(),Vector(),Some(io.grpc.StatusRuntimeException: 
INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 
times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost 
executor driver): java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
        at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
        at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
        at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
        at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
        at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
        at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at 
org.apache.spark.scheduler.Task.run(Task.sca...),Some(290),Some(IndentedText(- 
Dataset reduce,Dataset 
reduce,0)),Some(SeeStackDepthException),Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),None,pool-1-thread-1-ScalaTest-running-UserDefinedFunctionE2ETestSuite,1684472246665),
 setting it as NotSerializableWrapperException.
   [info] - Dataset reduce *** FAILED *** (290 milliseconds)
   [info]   io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage 
failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 
0.0 in stage 150.0 (TID 316) (localhost executor driver): 
java.lang.NullPointerException
   [info]       at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   [info]       at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   [info]       at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   [info]       at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   [info]       at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   [info]       at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   [info]       at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   [info]       at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   [info]       at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   [info]       at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info]       at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info]       at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info]       at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info]       at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info]       at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info]       at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   [info]       at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   [info]       at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   [info]       at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   [info]       at org.apache.spark.scheduler.Task.run(Task.sca...
   [info]   at io.grpc.Status.asRuntimeException(Status.java:535)
   [info]   at 
io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:660)
   [info]   at 
org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:62)
   [info]   at 
org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:114)
   [info]   at 
org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:131)
   [info]   at 
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2744)
   [info]   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3184)
   [info]   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2743)
   [info]   at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1292)
   [info]   at 
org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$34(UserDefinedFunctionE2ETestSuite.scala:212)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at 
org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at 
org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   [info]   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   [info]   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   [info]   at java.lang.Thread.run(Thread.java:750)
   ```
   
   I make a refactor work for these 2 case as 
https://github.com/apache/spark/pull/41227/files to make the problem 
reproducible by GA
   
   
   
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to