HeartSaVioR commented on code in PR #50466:
URL: https://github.com/apache/spark/pull/50466#discussion_r2027946678
##########
python/pyspark/sql/streaming/stateful_processor_api_client.py:
##########
@@ -49,22 +49,26 @@ class StatefulProcessorHandleState(Enum):
class StatefulProcessorApiClient:
def __init__(
- self, state_server_port: int, key_schema: StructType, is_driver: bool
= False
+ self, state_server_port: Union[int, str], key_schema: StructType,
is_driver: bool = False
) -> None:
self.key_schema = key_schema
- self._client_socket = socket.socket()
- self._client_socket.connect(("localhost", state_server_port))
-
- # SPARK-51667: We have a pattern of sending messages continuously from
one side
- # (Python -> JVM, and vice versa) before getting response from other
side. Since most
- # messages we are sending are small, this triggers the bad combination
of Nagle's algorithm
- # and delayed ACKs, which can cause a significant delay on the latency.
- # See SPARK-51667 for more details on how this can be a problem.
- #
- # Disabling either would work, but it's more common to disable Nagle's
algorithm; there is
- # lot less reference to disabling delayed ACKs, while there are lots
of resources to
- # disable Nagle's algorithm.
- self._client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,
1)
+ if isinstance(state_server_port, str):
+ self._client_socket = socket.socket(socket.AF_UNIX,
socket.SOCK_STREAM)
+ self._client_socket.connect(state_server_port)
+ else:
+ self._client_socket = socket.socket()
+ self._client_socket.connect(("localhost", state_server_port))
+
+ # SPARK-51667: We have a pattern of sending messages continuously
from one side
+ # (Python -> JVM, and vice versa) before getting response from
other side. Since most
+ # messages we are sending are small, this triggers the bad
combination of Nagle's
+ # algorithm and delayed ACKs, which can cause a significant delay
on the latency.
+ # See SPARK-51667 for more details on how this can be a problem.
+ #
+ # Disabling either would work, but it's more common to disable
Nagle's algorithm; there
+ # is lot less reference to disabling delayed ACKs, while there are
lots of resources to
+ # disable Nagle's algorithm.
+ self._client_socket.setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY, 1)
Review Comment:
This is only applied with TCP. If we leave this to TCP socket path, this
should be fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]