BryanCutler commented on a change in pull request #24070: [SPARK-23961][PYTHON]
Fix error when toLocalIterator goes out of scope
URL: https://github.com/apache/spark/pull/24070#discussion_r264832981
##########
File path: python/pyspark/rdd.py
##########
@@ -138,15 +138,54 @@ def _parse_memory(s):
return int(float(s[:-1]) * units[s[-1].lower()])
-def _load_from_socket(sock_info, serializer):
+def _create_local_socket(sock_info):
(sockfile, sock) = local_connect_and_auth(*sock_info)
- # The RDD materialization time is unpredicable, if we set a timeout for
socket reading
+ # The RDD materialization time is unpredictable, if we set a timeout for
socket reading
# operation, it will very possibly fail. See SPARK-18281.
sock.settimeout(None)
+ return sockfile, sock
+
+
+def _load_from_socket(sock_info, serializer):
+ (sockfile, _) = _create_local_socket(sock_info)
# The socket will be automatically closed when garbage-collected.
return serializer.load_stream(sockfile)
+class _PyLocalIterable(object):
+ """ Create a synchronous local iterable over a socket """
+
+ def __init__(self, sock_info, serializer):
+ (self.sockfile, self.sock) = _create_local_socket(sock_info)
+ self.serializer = serializer
+ self.read_iter = iter([]) # Initialize as empty iterator
+
+ def __iter__(self):
+ while True:
+ try:
+ # Request next partition data from Java, if no more then
connection is closed
+ write_int(1, self.sockfile)
+ self.sockfile.flush()
+
+ # Load the partition data as a stream and read each item
+ self.read_iter = self.serializer.load_stream(self.sockfile)
+ for item in self.read_iter:
+ yield item
+ except Exception: # TODO: more specific error, ConnectionError /
socket.error
Review comment:
TODO: need to address this
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]