[ 
https://issues.apache.org/jira/browse/SPARK-41999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688212#comment-17688212
 ] 

Apache Spark commented on SPARK-41999:
--------------------------------------

User 'ueshin' has created a pull request for this issue:
https://github.com/apache/spark/pull/40002

> NPE for bucketed write (ReadwriterTests.test_bucketed_write)
> ------------------------------------------------------------
>
>                 Key: SPARK-41999
>                 URL: https://issues.apache.org/jira/browse/SPARK-41999
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Connect
>    Affects Versions: 3.4.0
>            Reporter: Hyukjin Kwon
>            Priority: Major
>
> {code}
> java.util.NoSuchElementException
>       at java.util.AbstractList$Itr.next(AbstractList.java:364)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
>       at scala.collection.IterableLike.head(IterableLike.scala:109)
>       at scala.collection.IterableLike.head$(IterableLike.scala:108)
>       at scala.collection.AbstractIterable.head(Iterable.scala:56)
>       at 
> org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:1411)
>       at 
> org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:1297)
>       at 
> org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handleCommand(SparkConnectStreamHandler.scala:182)
>       at 
> org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48)
>       at 
> org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
>       at 
> org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
>       at 
> org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>       at 
> org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
>       at 
> org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>       at 
> org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 23/01/12 11:27:45 ERROR SerializingExecutor: Exception while executing 
> runnable 
> org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6c9d5784
> java.lang.NullPointerException
>       at 
> org.sparkproject.connect.google_protos.rpc.Status$Builder.setMessage(Status.java:783)
>       at 
> org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:112)
>       at 
> org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:85)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
>       at 
> org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:136)
>       at 
> org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
>       at 
> org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>       at 
> org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
>       at 
> org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>       at 
> org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> pyspark/sql/tests/test_readwriter.py:102 
> (ReadwriterParityTests.test_bucketed_write)
> self = 
> <pyspark.sql.tests.connect.test_parity_readwriter.ReadwriterParityTests 
> testMethod=test_bucketed_write>
>     def test_bucketed_write(self):
>         data = [
>             (1, "foo", 3.0),
>             (2, "foo", 5.0),
>             (3, "bar", -1.0),
>             (4, "bar", 6.0),
>         ]
>         df = self.spark.createDataFrame(data, ["x", "y", "z"])
>     
>         def count_bucketed_cols(names, table="pyspark_bucket"):
>             """Given a sequence of column names and a table name
>             query the catalog and return number o columns which are
>             used for bucketing
>             """
>             cols = self.spark.catalog.listColumns(table)
>             num = len([c for c in cols if c.name in names and c.isBucket])
>             return num
>     
>         with self.table("pyspark_bucket"):
>             # Test write with one bucketing column
> >           df.write.bucketBy(3, 
> > "x").mode("overwrite").saveAsTable("pyspark_bucket")
> ../test_readwriter.py:123: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> ../../connect/readwriter.py:381: in saveAsTable
>     
> self._spark.client.execute_command(self._write.command(self._spark.client))
> ../../connect/client.py:478: in execute_command
>     self._execute(req)
> ../../connect/client.py:562: in _execute
>     self._handle_error(rpc_error)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> self = <pyspark.sql.connect.client.SparkConnectClient object at 
> 0x7fe0d069b5b0>
> rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
>       status = StatusCode.UNKNOWN
>       details = ""
>       debug_error_string ...ved from peer ipv6:%5B::1%5D:15002 
> {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, 
> grpc_message:""}"
> >
>     def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
>         """
>         Error handling helper for dealing with GRPC Errors. On the server 
> side, certain
>         exceptions are enriched with additional RPC Status information. These 
> are
>         unpacked in this function and put into the exception.
>     
>         To avoid overloading the user with GRPC errors, this message 
> explicitly
>         swallows the error context from the call. This GRPC Error is logged 
> however,
>         and can be enabled.
>     
>         Parameters
>         ----------
>         rpc_error : grpc.RpcError
>            RPC Error containing the details of the exception.
>     
>         Returns
>         -------
>         Throws the appropriate internal Python exception.
>         """
>         logger.exception("GRPC Error received")
>         # We have to cast the value here because, a RpcError is a Call as 
> well.
>         # 
> https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
>         status = rpc_status.from_call(cast(grpc.Call, rpc_error))
>         if status:
>             for d in status.details:
>                 if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
>                     info = error_details_pb2.ErrorInfo()
>                     d.Unpack(info)
>                     if info.reason == 
> "org.apache.spark.sql.AnalysisException":
>                         raise SparkConnectAnalysisException(
>                             info.reason, info.metadata["message"], 
> info.metadata["plan"]
>                         ) from None
>                     else:
>                         raise SparkConnectException(status.message, 
> info.reason) from None
>     
>             raise SparkConnectException(status.message) from None
>         else:
> >           raise SparkConnectException(str(rpc_error)) from None
> E           pyspark.sql.connect.client.SparkConnectException: 
> <_MultiThreadedRendezvous of RPC that terminated with:
> E             status = StatusCode.UNKNOWN
> E             details = ""
> E             debug_error_string = "UNKNOWN:Error received from peer 
> ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", 
> grpc_status:2, grpc_message:""}"
> E           >
> ../../connect/client.py:640: SparkConnectException
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to