zhengruifeng opened a new pull request, #48135:
URL: https://github.com/apache/spark/pull/48135

   ### What changes were proposed in this pull request?
   Function `substring` should accept column names
   
   ### Why are the changes needed?
   Bug fix:
   
   ```
   In [1]:     >>> import pyspark.sql.functions as sf
      ...:     >>> df = spark.createDataFrame([('Spark', 2, 3)], ['s', 'p', 
'l'])
      ...:     >>> df.select('*', sf.substring('s', 'p', 'l')).show()
   ```
   
   works in PySpark Classic, but fail in Connect with:
   ```
   NumberFormatException                     Traceback (most recent call last)
   Cell In[2], line 1
   ----> 1 df.select('*', sf.substring('s', 'p', 'l')).show()
   
   File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1170, in 
DataFrame.show(self, n, truncate, vertical)
      1169 def show(self, n: int = 20, truncate: Union[bool, int] = True, 
vertical: bool = False) -> None:
   -> 1170     print(self._show_string(n, truncate, vertical))
   
   File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:927, in 
DataFrame._show_string(self, n, truncate, vertical)
       910     except ValueError:
       911         raise PySparkTypeError(
       912             errorClass="NOT_BOOL",
       913             messageParameters={
      (...)
       916             },
       917         )
       919 table, _ = DataFrame(
       920     plan.ShowString(
       921         child=self._plan,
       922         num_rows=n,
       923         truncate=_truncate,
       924         vertical=vertical,
       925     ),
       926     session=self._session,
   --> 927 )._to_table()
       928 return table[0][0].as_py()
   
   File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1844, in 
DataFrame._to_table(self)
      1842 def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]:
      1843     query = self._plan.to_proto(self._session.client)
   -> 1844     table, schema, self._execution_info = 
self._session.client.to_table(
      1845         query, self._plan.observations
      1846     )
      1847     assert table is not None
      1848     return (table, schema)
   
   File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:892, in 
SparkConnectClient.to_table(self, plan, observations)
       890 req = self._execute_plan_request_with_metadata()
       891 req.plan.CopyFrom(plan)
   --> 892 table, schema, metrics, observed_metrics, _ = 
self._execute_and_fetch(req, observations)
       894 # Create a query execution object.
       895 ei = ExecutionInfo(metrics, observed_metrics)
   
   File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1517, in 
SparkConnectClient._execute_and_fetch(self, req, observations, self_destruct)
      1514 properties: Dict[str, Any] = {}
      1516 with Progress(handlers=self._progress_handlers, 
operation_id=req.operation_id) as progress:
   -> 1517     for response in self._execute_and_fetch_as_iterator(
      1518         req, observations, progress=progress
      1519     ):
      1520         if isinstance(response, StructType):
      1521             schema = response
   
   File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1494, in 
SparkConnectClient._execute_and_fetch_as_iterator(self, req, observations, 
progress)
      1492     raise kb
      1493 except Exception as error:
   -> 1494     self._handle_error(error)
   
   File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1764, in 
SparkConnectClient._handle_error(self, error)
      1762 self.thread_local.inside_error_handling = True
      1763 if isinstance(error, grpc.RpcError):
   -> 1764     self._handle_rpc_error(error)
      1765 elif isinstance(error, ValueError):
      1766     if "Cannot invoke RPC" in str(error) and "closed" in str(error):
   
   File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1840, in 
SparkConnectClient._handle_rpc_error(self, rpc_error)
      1837             if info.metadata["errorClass"] == 
"INVALID_HANDLE.SESSION_CHANGED":
      1838                 self._closed = True
   -> 1840             raise convert_exception(
      1841                 info,
      1842                 status.message,
      1843                 self._fetch_enriched_error(info),
      1844                 self._display_server_stack_trace(),
      1845             ) from None
      1847     raise SparkConnectGrpcException(status.message) from None
      1848 else:
   
   NumberFormatException: [CAST_INVALID_INPUT] The value 'p' of the type 
"STRING" cannot be cast to "INT" because it is malformed. Correct the value as 
per the syntax, or change its target type. Use `try_cast` to tolerate malformed 
input and return NULL instead. SQLSTATE: 22018
   ...
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   yes, Function `substring` in Connect can properly handle column names
   
   
   ### How was this patch tested?
   updated CI
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to