GitHub user Tagar opened a pull request:

    https://github.com/apache/spark/pull/23113

    [SPARK-26019][PYTHON] Fix race condition in accumulators.py: 
_start_update_server()

    ## What changes were proposed in this pull request?
    
    Fixing race condition that happens in accumulators.py: 
_start_update_server()
    
    1) SocketServer:TCPServer defaults bind_and_activate to True
    https://github.com/python/cpython/blob/2.7/Lib/SocketServer.py#L413 
    
    2) Also `handle()` is defined in derived class _UpdateRequestHandler here 
    
https://github.com/apache/spark/blob/master/python/pyspark/accumulators.py#L232
    
    These both together can causes a race condition that leads to race 
condition explained in 
    https://issues.apache.org/jira/browse/SPARK-26019
    
    > 
    > Exception happened during processing of request from ('127.0.0.1', 43418)
    >
    > Traceback (most recent call last):
    >   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", 
line 290, in _handle_request_noblock
    >     self.process_request(request, client_address)
    >   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", 
line 318, in process_request
    >     self.finish_request(request, client_address)
    >   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", 
line 331, in finish_request
    >     self.RequestHandlerClass(request, client_address, self)
    >   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", 
line 652, in __init__
    >     self.handle()
    >   File 
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
 line 263, in handle
    >     poll(authenticate_and_accum_updates)
    >   File 
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
 line 238, in poll
    >     if func():
    >   File 
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
 line 251, in authenticate_and_accum_updates
    >     received_token = self.rfile.read(len(auth_token))
    > TypeError: object of type 'NoneType' has no len()
    >  
    
    ## How was this patch tested?
    
    Tested in the same environment where I can always reproduce it. 
    This change fixes this issue.
    Tested manually.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Tagar/spark SPARK-26019

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/23113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23113
    
----
commit 113828dd93ca5cd392a53e3aa20bd16bd3792747
Author: Ruslan Dautkhanov <tagar@...>
Date:   2018-11-22T00:08:59Z

    Fix for SPARK-26019

commit ad97861c51fcd6951d888e5fa2db12b9f1befb07
Author: Ruslan Dautkhanov <tagar@...>
Date:   2018-11-22T00:22:01Z

    Bring up to master

commit 00c326dfdb994b113f1b59c5622f6fc1a708c06a
Author: Ruslan Dautkhanov <tagar@...>
Date:   2018-11-22T00:23:53Z

    minor fix

commit 38b3d2ef8dbbf7de6aa5fb1a2258192313624643
Author: Ruslan Dautkhanov <tagar@...>
Date:   2018-11-22T00:25:39Z

    merge back changes in master

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to