[issue44155] Race condition when using multiprocessing BaseManager and Pool in Python3

2021-05-24 Thread Jun


Change by Jun :


--
keywords: +patch
nosy: +junnplus
nosy_count: 1.0 -> 2.0
pull_requests: +24924
stage:  -> patch review
pull_request: https://github.com/python/cpython/pull/26332

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue44155] Race condition when using multiprocessing BaseManager and Pool in Python3

2021-05-24 Thread David Chen


David Chen  added the comment:

After some investigation, i almost finally identified the root cause - it was 
caused by the following line in `Server` class in `manager.py` file:
```
self.listener = Listener(address=address, backlog=16)
```
i'm not sure where the magic number `16` came from, but it turned out it was 
not big enough for some powerful CPU(48 cores in my case), where the socket 
server starts refusing to accepting new connections if there are more than `16` 
unaccepted connections, i think this is why the code stuck at `Client -> 
answer_challenge` where attempting to create new connection to the server. 
After i change the number to `32`, this issue is gone.
IMO, a magic number here is not very suitable, maybe we could make this number 
configurable or use `cpu_count` to set it to a better number based on the CPU 
count.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue44155] Race condition when using multiprocessing BaseManager and Pool in Python3

2021-05-17 Thread David Chen


New submission from David Chen :

could someone help me out? i spent a lot of time to debug a race condition i 
have encountered when using BaseManager, Pool within multiprocessing library. 
here is the simplified code:
```
import sys, time
from multiprocessing.managers import BaseManager, SyncManager, BaseProxy
from multiprocessing import Process, cpu_count, Pool, Lock, get_context
from multiprocessing.queues import Queue, JoinableQueue
import queue

class QueueManager(BaseManager):
pass

class Singleton:
'''
Decorator class for singleton pattern.
'''
def __init__(self, cls):
self._cls = cls
self._lock = Lock()
self._instance = {}
 
def __call__(self, *args, **kwargs):
if self._cls not in self._instance:
with self._lock:
self._instance[self._cls] = self._cls(*args, **kwargs)
return self._instance[self._cls]

def getInstance(self):
return self._instance[self._cls]


class LoggingServer(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
self.msgQueue = queue.Queue()
try:
QueueManager.register('getQueue', callable=lambda: self.msgQueue)
self.queueManager = QueueManager(address = self.logServerAddr, 
authkey = self.logServerPwd)
self.logServer = self.queueManager.get_server()
self.logServer.serve_forever()
except:
raise RuntimeError("Couldn't start the logging server!")

class LoggingProcess(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
try:
QueueManager.register('getQueue')
self.queueManager = QueueManager(address = self.logServerAddr, 
authkey = self.logServerPwd)
self.queueManager.connect()
except:
raise RuntimeError("Couldn't connect logging process to the logging 
server!")

self.msgQueue  = self.queueManager.getQueue()
self.process = Process(target=self.loggingProcess, name = "Logging 
Process", args=(), daemon = True)
self.process.start()

def terminate(self):
self.msgQueue.join()
self.process.terminate()

def loggingProcess(self):
while True:
logObj = self.msgQueue.get()
print(logObj)

@Singleton
class Logger(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
self.queueManager = None
self.msgQueue  = None

def connectToLogServer(self):
try:
QueueManager.register('getQueue')
self.queueManager = QueueManager(address = self.logServerAddr, 
authkey = self.logServerPwd)
self.queueManager.connect()
self.msgQueue  = self.queueManager.getQueue()
self.ready = True
except:
raise RuntimeError("Couldn't connect logger to Log Server!")

def ReadyCheck(func):
def makeDecorator(self, *args, **kwargs):
if not self.msgQueue:
self.connectToLogServer()
func(self, *args, **kwargs)

return makeDecorator

# Overrided function to log info
@ReadyCheck
def info(self, info, logfile = sys.stdout):
self.msgQueue.put(info)

address = ('', 5)
password = b'PASSWORD'

log = Logger(address, password)

def callback(*args):
#print("Finished!!!")
pass

def job(index):
time.sleep(0.1)
log.info(str(log.msgQueue) + ":{}".format(index))
log.info("here {}".format(index))




if __name__ == "__main__":
# import multiprocessing
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(multiprocessing.SUBDEBUG)
serverProcess = Process(target = LoggingServer, name = 
"LoggingServerDaemon", args = ((address, password)), daemon = True)
serverProcess.start()
time.sleep(1)
loggingProcess = LoggingProcess(address, password)
log.info("Starting...")
#pool = Pool(cpu_count())
pool = Pool() #Using a small number of worker(like 10), no problem, but if 
we increase to a bigger number, say 48 in my case, this program hangs every 
time...

results = [pool.apply_async(job, (i,), callback = callback) for i in 
range(1)]

pool.close()
pool.join()

log.info("Done")

#loggingProcess.terminate()
#serverProcess.terminate()
```
LoggerServer class is working as a logging Server(like a proxy), which manages 
a shared queue. LoggingProcess class is a log consumer class, which fetch the 
logs from the shared queue(managed by LoggingServer). Logger class is a 
producer class, which put the logs into the shared queue. As i want to share 
the global logger in multiple modules in order to unify the logs format/output 
places/...(something like the logging standard library), so the Logger class is 
not fully initialized and will be fully initialized