# HG changeset patch
# User Jun Wu <qu...@fb.com>
# Date 1486601798 28800
#      Wed Feb 08 16:56:38 2017 -0800
# Node ID 8410c4a670ffff3bed4b459cf8d62bd32fdcb1e7
# Parent  138f7ba58a70de9610713b8bd55d1ba0ac468fa6
# Available At https://bitbucket.org/quark-zju/hg-draft
#              hg pull https://bitbucket.org/quark-zju/hg-draft -r 8410c4a670ff
chgcache: implement simple IPC mechanism

We need an inter-process communication mechanism so forked chg workers could
tell the master chg server where the repos are. The IPC mechanism:

  - Could be one-way - workers send messages to the master. Currently
    workers won't read from the master.
  - Should be non-blocking on write - workers should not wait for the master
    to read content they send.
  - Could lose messages - messages are just some "suggestions" about what to
    preload, which could be discarded safely.
  - Better to be blocking on read - if reading is blocking, the master
    server could learn what to preload immediately, without polling
    periodically.

This patch adds a class using datagram sockets to do the IPC. The choice is
mainly because SOCK_DGRAM prevents incomplete messages from being sent, and
we don't need to deal with message boundaries.

diff --git a/hgext/chgcache.py b/hgext/chgcache.py
--- a/hgext/chgcache.py
+++ b/hgext/chgcache.py
@@ -10,2 +10,56 @@ With this extension installed, Mercurial
 repo objects to further reduce start-up time.
 """
+from __future__ import absolute_import
+
+import socket
+
+class socketipc(object):
+    """A simple IPC mechanism that sets up an unreliable communication channel
+    between the master server and (multiple) forked worker processes. The
+    forked workers do non-blocking writes, while the master server does
+    blocking reads.
+
+    To use the object, create it in the master server, read from a thread, and
+    write from forked processes:
+
+        # pid=1000, master, main thread
+        ipc = socketipc()
+
+        # pid=1000, master, a background thread
+        while True:
+            msg = ipc.recv() # blocking
+            ....
+
+        # pid=1001, worker
+        ipc.send('foo') # non-blocking, silently ignore errors
+
+        # pid=1002, worker
+        ipc.send('bar') # non-blocking, silently ignore errors
+    """
+
+    suffix = b'\0'  # to detect truncated recv()s
+    maxsize = 1 << 16  # estimated max packet size for recv()
+
+    def __init__(self):
+        self._in, self._out = socket.socketpair(socket.AF_UNIX,
+                                                socket.SOCK_DGRAM)
+        self._out.setblocking(False)
+
+    def send(self, msg):
+        """send msg without blocking. fail silently on errors, ex. msg is too
+        long, or the queue is full. msg should not contain '\0'.
+        """
+        try:
+            return self._out.send(msg + self.suffix)
+        except socket.error:
+            pass
+
+    def recv(self):
+        """receive a complete msg. blocking."""
+        while True:
+            try:
+                msg = self._in.recv(self.maxsize)
+                if msg.endswith(self.suffix):
+                    return msg[:-1]
+            except socket.error:
+                pass
_______________________________________________
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel

Reply via email to