From 8200e949bd5066199ab6e17f0dd5efda6039de5d Mon Sep 17 00:00:00 2001
From: Benjamin Mahler <benjamin.mahler@gmail.com>
Date: Mon, 8 Dec 2014 22:18:25 -0800
Subject: [PATCH] *** Modified for 0.21.0-tw11 *** Fixed a long-standing
 performance issue in libprocess' SocketManager.

Review: https://reviews.apache.org/r/28838

Conflicts:
	3rdparty/libprocess/src/process.cpp
---
 3rdparty/libprocess/src/process.cpp | 95 ++++++++++++++++++++++++++-----------
 1 file changed, 67 insertions(+), 28 deletions(-)

diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 85fb995..cbe3c34 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -287,8 +287,18 @@ public:
   void exited(ProcessBase* process);
 
 private:
-  // Map from UPID (local/remote) to process.
-  map<UPID, set<ProcessBase*> > links;
+  // TODO(bmahler): Leverage a bidirectional multimap instead, or
+  // hide the complexity of manipulating 'links' through methods.
+  struct
+  {
+    // For links, we maintain a bidirectional mapping between the
+    // "linkers" (Processes) and the "linkees" (remote / local UPIDs).
+    // For remote nodes, we also need a mapping to the linkees on the
+    // node, because socket closure only notifies at the node level.
+    hashmap<UPID, hashset<ProcessBase*>> linkers;
+    hashmap<ProcessBase*, hashset<UPID>> linkees;
+    hashmap<Node, hashset<UPID>> remotes;
+  } links;
 
   // Collection of all actice sockets.
   map<int, Socket> sockets;
@@ -2032,7 +2042,11 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
       ev_async_send(loop, &async_watcher);
     }
 
-    links[to].insert(process);
+    links.linkers[to].insert(process);
+    links.linkees[process].insert(to);
+    if (node.ip != __ip__ || node.port != __port__) {
+      links.remotes[node].insert(to);
+    }
   }
 }
 
@@ -2377,20 +2391,30 @@ void SocketManager::exited(const Node& node)
   // ourselves that the accesses to each Process object will always be
   // valid.
   synchronized (this) {
-    list<UPID> removed;
-    // Look up all linked processes.
-    foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
-      if (linkee.ip == node.ip && linkee.port == node.port) {
-        foreach (ProcessBase* linker, processes) {
-          linker->enqueue(new ExitedEvent(linkee));
+    if (!links.remotes.contains(node)) {
+      return; // No linkees for this node!
+    }
+
+    foreach (const UPID& linkee, links.remotes[node]) {
+      // Find and notify the linkers.
+      CHECK(links.linkers.contains(linkee));
+
+      foreach (ProcessBase* linker, links.linkers[linkee]) {
+        linker->enqueue(new ExitedEvent(linkee));
+
+        // Remove the linkee pid from the linker.
+        CHECK(links.linkees.contains(linker));
+
+        links.linkees[linker].erase(linkee);
+        if (links.linkees[linker].empty()) {
+          links.linkees.erase(linker);
         }
-        removed.push_back(linkee);
       }
-    }
 
-    foreach (const UPID& pid, removed) {
-      links.erase(pid);
+      links.linkers.erase(linkee);
     }
+
+    links.remotes.erase(node);
   }
 }
 
@@ -2408,25 +2432,40 @@ void SocketManager::exited(ProcessBase* process)
   const Time time = Clock::now(process);
 
   synchronized (this) {
-    // Iterate through the links, removing any links the process might
-    // have had and creating exited events for any linked processes.
-    foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
-      processes.erase(process);
-
-      if (linkee == pid) {
-        foreach (ProcessBase* linker, processes) {
-          CHECK(linker != process) << "Process linked with itself";
-          synchronized (timeouts) {
-            if (Clock::paused()) {
-              Clock::update(linker, time);
-            }
-          }
-          linker->enqueue(new ExitedEvent(linkee));
+    // If this process had linked to anything, we need to clean
+    // up any pointers to it.
+    if (links.linkees.contains(process)) {
+      foreach (const UPID& linkee, links.linkees[process]) {
+        CHECK(links.linkers.contains(linkee));
+
+        links.linkers[linkee].erase(process);
+        if (links.linkers[linkee].empty()) {
+          links.linkers.erase(linkee);
         }
       }
+      links.linkees.erase(process);
+    }
+
+    // Find the linkers to notify.
+    if (!links.linkers.contains(pid)) {
+      return; // No linkers for this process!
+    }
+
+    foreach (ProcessBase* linker, links.linkers[pid]) {
+      CHECK(linker != process) << "Process linked with itself";
+      Clock::update(linker, time);
+      linker->enqueue(new ExitedEvent(pid));
+
+      // Remove the linkee pid from the linker.
+      CHECK(links.linkees.contains(linker));
+
+      links.linkees[linker].erase(pid);
+      if (links.linkees[linker].empty()) {
+        links.linkees.erase(linker);
+      }
     }
 
-    links.erase(pid);
+    links.linkers.erase(pid);
   }
 }
 
-- 
2.0.4.218.g7ec489f-twtrsrc

