LuciferYang commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1202132810
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -197,4 +197,22 @@ class UserDefinedFunctionE2ETestSuite extends
RemoteSparkSession {
spark.range(10).repartition(1).foreachPartition(func)
assert(sum.get() == 0) // The value is not 45
}
+
+ test("Dataset reduce") {
Review Comment:
@zhenlineo @hvanhovell @amaliujia I found an interesting thing about these
two new cases, `SimpleSparkConnectService` will be submitted as `local [*]`,
and when the number of cores on the machine running the case is greater than
10, these two cases will failed as follows:
```
Warning: Unable to serialize throwable of type
io.grpc.StatusRuntimeException for TestFailed(Ordinal(0, 271),INTERNAL: Job
aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver):
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
at
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
at
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
at
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at
org.apache.spark.scheduler.Task.run(Task.sca...,UserDefinedFunctionE2ETestSuite,org.apache.spark.sql.UserDefinedFunctionE2ETestSuite,Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),Dataset
reduce,Dataset reduce,Vector(),Vector(),Some(io.grpc.StatusRuntimeException:
INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1
times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost
executor driver): java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
at
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
at
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
at
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at
org.apache.spark.scheduler.Task.run(Task.sca...),Some(290),Some(IndentedText(-
Dataset reduce,Dataset
reduce,0)),Some(SeeStackDepthException),Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),None,pool-1-thread-1-ScalaTest-running-UserDefinedFunctionE2ETestSuite,1684472246665),
setting it as NotSerializableWrapperException.
[info] - Dataset reduce *** FAILED *** (290 milliseconds)
[info] io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage
failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task
0.0 in stage 150.0 (TID 316) (localhost executor driver):
java.lang.NullPointerException
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
[info] at
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
[info] at
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
[info] at
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
[info] at
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
[info] at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
[info] at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
[info] at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
[info] at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
[info] at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info] at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
[info] at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info] at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
[info] at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
[info] at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
[info] at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
[info] at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
[info] at org.apache.spark.scheduler.Task.run(Task.sca...
[info] at io.grpc.Status.asRuntimeException(Status.java:535)
[info] at
io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:660)
[info] at
org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:62)
[info] at
org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:114)
[info] at
org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:131)
[info] at
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2744)
[info] at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3184)
[info] at org.apache.spark.sql.Dataset.collect(Dataset.scala:2743)
[info] at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1292)
[info] at
org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$34(UserDefinedFunctionE2ETestSuite.scala:212)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info] at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info] at
org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at
org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:431)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at
org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(UserDefinedFunctionE2ETestSuite.scala:35)
[info] at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at
org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.run(UserDefinedFunctionE2ETestSuite.scala:35)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info] at java.lang.Thread.run(Thread.java:750)
```
I make a refactor work for these 2 case as
https://github.com/apache/spark/pull/41227/files to make the problem
reproducible by GA
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]