# HG changeset patch # User Jun Wu <qu...@fb.com> # Date 1486601798 28800 # Wed Feb 08 16:56:38 2017 -0800 # Node ID 8410c4a670ffff3bed4b459cf8d62bd32fdcb1e7 # Parent 138f7ba58a70de9610713b8bd55d1ba0ac468fa6 # Available At https://bitbucket.org/quark-zju/hg-draft # hg pull https://bitbucket.org/quark-zju/hg-draft -r 8410c4a670ff chgcache: implement simple IPC mechanism
We need an inter-process communication mechanism so forked chg workers could tell the master chg server where the repos are. The IPC mechanism: - Could be one-way - workers send messages to the master. Currently workers won't read from the master. - Should be non-blocking on write - workers should not wait for the master to read content they send. - Could lose messages - messages are just some "suggestions" about what to preload, which could be discarded safely. - Better to be blocking on read - if reading is blocking, the master server could learn what to preload immediately, without polling periodically. This patch adds a class using datagram sockets to do the IPC. The choice is mainly because SOCK_DGRAM prevents incomplete messages from being sent, and we don't need to deal with message boundaries. diff --git a/hgext/chgcache.py b/hgext/chgcache.py --- a/hgext/chgcache.py +++ b/hgext/chgcache.py @@ -10,2 +10,56 @@ With this extension installed, Mercurial repo objects to further reduce start-up time. """ +from __future__ import absolute_import + +import socket + +class socketipc(object): + """A simple IPC mechanism that sets up an unreliable communication channel + between the master server and (multiple) forked worker processes. The + forked workers do non-blocking writes, while the master server does + blocking reads. + + To use the object, create it in the master server, read from a thread, and + write from forked processes: + + # pid=1000, master, main thread + ipc = socketipc() + + # pid=1000, master, a background thread + while True: + msg = ipc.recv() # blocking + .... + + # pid=1001, worker + ipc.send('foo') # non-blocking, silently ignore errors + + # pid=1002, worker + ipc.send('bar') # non-blocking, silently ignore errors + """ + + suffix = b'\0' # to detect truncated recv()s + maxsize = 1 << 16 # estimated max packet size for recv() + + def __init__(self): + self._in, self._out = socket.socketpair(socket.AF_UNIX, + socket.SOCK_DGRAM) + self._out.setblocking(False) + + def send(self, msg): + """send msg without blocking. fail silently on errors, ex. msg is too + long, or the queue is full. msg should not contain '\0'. + """ + try: + return self._out.send(msg + self.suffix) + except socket.error: + pass + + def recv(self): + """receive a complete msg. blocking.""" + while True: + try: + msg = self._in.recv(self.maxsize) + if msg.endswith(self.suffix): + return msg[:-1] + except socket.error: + pass _______________________________________________ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel