allisonwang-db commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384277531
##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str:
return self._artifact_manager.cache_artifact(blob)
raise SparkConnectException("Invalid state during retry exception
handling.")
+ def _verify_response_integrity(
+ self,
+ response: Union[
+ pb2.ConfigResponse,
+ pb2.ExecutePlanResponse,
+ pb2.InterruptResponse,
+ pb2.ReleaseExecuteResponse,
+ pb2.AddArtifactsResponse,
+ pb2.AnalyzePlanResponse,
+ pb2.FetchErrorDetailsResponse,
+ pb2.ReleaseSessionResponse,
+ ],
+ ) -> None:
+ """
+ Verifies the integrity of the response. This method checks if the
session ID and the
+ server-side session ID match. If not, it throws an exception.
+ Parameters
+ ----------
+ response - One of the different response types handled by the Spark
Connect service
+ """
+ if self._session_id != response.session_id:
+ raise SparkConnectException(
+ "Received incorrect session identifier for request:"
+ f"{response.session_id} != {self._session_id}"
+ )
Review Comment:
Is this an invalid state that should not occur? If so, could we raise
`PySparkAssertionError` instead?
##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str:
return self._artifact_manager.cache_artifact(blob)
raise SparkConnectException("Invalid state during retry exception
handling.")
+ def _verify_response_integrity(
+ self,
+ response: Union[
+ pb2.ConfigResponse,
+ pb2.ExecutePlanResponse,
+ pb2.InterruptResponse,
+ pb2.ReleaseExecuteResponse,
+ pb2.AddArtifactsResponse,
+ pb2.AnalyzePlanResponse,
+ pb2.FetchErrorDetailsResponse,
+ pb2.ReleaseSessionResponse,
+ ],
+ ) -> None:
+ """
+ Verifies the integrity of the response. This method checks if the
session ID and the
+ server-side session ID match. If not, it throws an exception.
+ Parameters
+ ----------
+ response - One of the different response types handled by the Spark
Connect service
+ """
+ if self._session_id != response.session_id:
+ raise SparkConnectException(
+ "Received incorrect session identifier for request:"
+ f"{response.session_id} != {self._session_id}"
+ )
+ if self._server_session_id is not None:
+ if response.server_side_session_id != self._server_session_id:
+ raise SparkConnectException(
+ "Received incorrect server side session identifier for
request. "
+ "Please restart Spark Session. ("
Review Comment:
Can we be more explicit in this error message on how to restart a Spark
session?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]