Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211683369
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +99,126 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is 
missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +def _load_from_socket(port, auth_secret):
    +    """
    +    Load data from a given socket, this is a blocking method thus only 
return when the socket
    +    connection has been closed.
    +    """
    +    sock = None
    +    # Support for both IPv4 and IPv6.
    +    # On most of IPv6-ready systems, IPv6 will take precedence.
    +    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
    +        af, socktype, proto, canonname, sa = res
    +        sock = socket.socket(af, socktype, proto)
    +        try:
    +            # Do not allow timeout for socket reading operation.
    +            sock.settimeout(None)
    +            sock.connect(sa)
    +        except socket.error:
    +            sock.close()
    +            sock = None
    +            continue
    +        break
    +    if not sock:
    +        raise Exception("could not open socket")
    +
    +    sockfile = sock.makefile("rwb", 65536)
    +    write_with_length("run".encode("utf-8"), sockfile)
    +    sockfile.flush()
    +    do_server_auth(sockfile, auth_secret)
    +
    +    # The socket will be automatically closed when garbage-collected.
    +    return UTF8Deserializer().loads(sockfile)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _port = None
    +    _secret = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    +
    +    @classmethod
    +    def _getOrCreate(cls):
    +        """Internal function to get or create global BarrierTaskContext."""
    +        if cls._taskContext is None:
    --- End diff --
    
    IIUC reuse python worker just means we start a python worker from a daemon 
thread, it shall not affect the input/output files related to worker.py.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to