Repository: mesos
Updated Branches:
  refs/heads/master de2a7f414 -> c33ba209d


Added `process::loop` abstraction.

Review: https://reviews.apache.org/r/54110


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9b34363e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9b34363e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9b34363e

Branch: refs/heads/master
Commit: 9b34363e6a471339c328b300b286d321708c0940
Parents: de2a7f4
Author: Benjamin Hindman <benjamin.hind...@gmail.com>
Authored: Sun Nov 27 09:46:49 2016 -0800
Committer: Benjamin Hindman <benjamin.hind...@gmail.com>
Committed: Thu Dec 1 00:49:05 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am              |   1 +
 3rdparty/libprocess/include/Makefile.am      |   1 +
 3rdparty/libprocess/include/process/loop.hpp | 269 ++++++++++++++++++++++
 3rdparty/libprocess/src/tests/loop_tests.cpp | 121 ++++++++++
 4 files changed, 392 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9b34363e/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 9d496b8..10f4163 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -224,6 +224,7 @@ libprocess_tests_SOURCES =                                  
\
   src/tests/http_tests.cpp                                     \
   src/tests/io_tests.cpp                                       \
   src/tests/limiter_tests.cpp                                  \
+  src/tests/loop_tests.cpp                                     \
   src/tests/main.cpp                                           \
   src/tests/metrics_tests.cpp                                  \
   src/tests/mutex_tests.cpp                                    \

http://git-wip-us.apache.org/repos/asf/mesos/blob/9b34363e/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am 
b/3rdparty/libprocess/include/Makefile.am
index 17c5d11..1d17edd 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -37,6 +37,7 @@ nobase_include_HEADERS =              \
   process/latch.hpp                    \
   process/limiter.hpp                  \
   process/logging.hpp                  \
+  process/loop.hpp                     \
   process/message.hpp                  \
   process/mime.hpp                     \
   process/mutex.hpp                    \

http://git-wip-us.apache.org/repos/asf/mesos/blob/9b34363e/3rdparty/libprocess/include/process/loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/loop.hpp 
b/3rdparty/libprocess/include/process/loop.hpp
new file mode 100644
index 0000000..a78ea7d
--- /dev/null
+++ b/3rdparty/libprocess/include/process/loop.hpp
@@ -0,0 +1,269 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_LOOP_HPP__
+#define __PROCESS_LOOP_HPP__
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
+namespace process {
+
+// Provides an asynchronous "loop" abstraction. This abstraction is
+// helpful for code that would have synchronously been written as a
+// loop but asynchronously ends up being a recursive set of functions
+// which depending on the compiler may result in a stack overflow
+// (i.e., a compiler that can't do sufficient tail call optimization
+// may add stack frames for each recursive call).
+//
+// The loop abstraction takes a PID `pid` and uses it as the execution
+// context to run the loop. The implementation does a `defer` on this
+// `pid` to "pop" the stack when it needs to asynchronously
+// recurse. This also lets callers synchronize execution with other
+// code dispatching and deferring using `pid`.
+//
+// The two functions passed to the loop represent the loop "iterate"
+// step and the loop "body" step respectively. Each invocation of
+// "iterate" returns the next value and the "body" returns whether or
+// not to continue looping (as well as any other processing necessary
+// of course). You can think of this synchronously as:
+//
+//     bool condition = true;
+//     do {
+//       condition = body(iterate());
+//     } while (condition);
+//
+// Asynchronously using recursion this might look like:
+//
+//     Future<Nothing> loop()
+//     {
+//       return iterate()
+//         .then([](T t) {
+//            return body(t)
+//             .then([](bool condition) {
+//               if (condition) {
+//                 return loop();
+//               } else {
+//                 return Nothing();
+//               }
+//             });
+//         });
+//     }
+//
+// And asynchronously using `pid` as the execution context:
+//
+//     Future<Nothing> loop()
+//     {
+//       return iterate()
+//         .then(defer(pid, [](T t) {
+//            return body(t)
+//             .then(defer(pid, [](bool condition) {
+//               if (condition) {
+//                 return loop();
+//               } else {
+//                 return Nothing();
+//               }
+//             }));
+//         }));
+//     }
+//
+// And now what this looks like using `loop`:
+//
+//     loop(pid,
+//          []() { return iterate(); },
+//          [](T t) {
+//            return body(t);
+//          });
+//
+// TODO(benh): Provide an implementation that doesn't require a `pid`
+// for situations like `io::read` and `io::write` where for
+// performance reasons it could make more sense to NOT defer but
+// rather just let the I/O thread handle the execution.
+template <typename Iterate, typename Body>
+Future<Nothing> loop(const UPID& pid, Iterate&& iterate, Body&& body);
+
+
+// A helper for `loop` which creates a Process for us to provide an
+// execution context for running the loop.
+template <typename Iterate, typename Body>
+Future<Nothing> loop(Iterate&& iterate, Body&& body)
+{
+  ProcessBase* process = new ProcessBase();
+  return loop(
+      spawn(process, true), // Have libprocess free `process`.
+      std::forward<Iterate>(iterate),
+      std::forward<Body>(body))
+    .onAny([=]() {
+      terminate(process);
+      // NOTE: no need to `wait` or `delete` since the `spawn` above
+      // put libprocess in control of garbage collection.
+    });
+}
+
+
+namespace internal {
+
+template <typename Iterate, typename Body, typename T>
+class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T>>
+{
+public:
+  Loop(const UPID& pid, const Iterate& iterate, const Body& body)
+    : pid(pid), iterate(iterate), body(body) {}
+
+  Loop(const UPID& pid, Iterate&& iterate, Body&& body)
+    : pid(pid), iterate(std::move(iterate)), body(std::move(body)) {}
+
+  std::shared_ptr<Loop> shared()
+  {
+    // Must fully specify `shared_from_this` because we're templated.
+    return std::enable_shared_from_this<Loop>::shared_from_this();
+  }
+
+  std::weak_ptr<Loop> weak()
+  {
+    return std::weak_ptr<Loop>(shared());
+  }
+
+  Future<Nothing> start()
+  {
+    auto self = shared();
+    auto weak_self = weak();
+
+    // Make sure we propagate discarding. Note that to avoid an
+    // infinite memory bloat we explicitly don't add a new `onDiscard`
+    // callback for every new future that gets created from invoking
+    // `iterate()` or `body()` but instead discard those futures
+    // explicitly with our single callback here.
+    promise.future()
+      .onDiscard(defer(pid, [weak_self, this]() {
+        auto self = weak_self.lock();
+        if (self) {
+          // NOTE: There's no race here between setting `next` or
+          // `condition` and calling `discard()` on those futures
+          // because we're serializing execution via `defer` on
+          // `pid`. An alternative would require something like
+          // `atomic_shared_ptr` or a mutex.
+          next.discard();
+          condition.discard();
+        }
+      }));
+
+    // Start the loop using `pid` as the execution context.
+    dispatch(pid, [self, this]() {
+      next = discard_if_necessary<T>(iterate());
+      run();
+    });
+
+    return promise.future();
+  }
+
+  // Helper for discarding a future if our promise already has a
+  // discard. We need to check this for every future that gets
+  // returned from `iterate` and `body` because there is a race
+  // between our discard callback (that was set up in `start`) from
+  // being executed and us replacing that future on the next call to
+  // `iterate` and `body`. Note that we explicitly don't stop the loop
+  // if our promise has a discard but rather we just propagate the
+  // discard on to any futures returned from `iterate` and `body`. In
+  // the event of synchronous `iterate` or `body` functions this could
+  // result in an infinite loop.
+  template <typename U>
+  Future<U> discard_if_necessary(Future<U> future) const
+  {
+    if (promise.future().hasDiscard()) {
+      future.discard();
+    }
+    return future;
+  }
+
+  void run()
+  {
+    auto self = shared();
+
+    while (next.isReady()) {
+      condition = discard_if_necessary<bool>(body(next.get()));
+      if (condition.isReady()) {
+        if (condition.get()) {
+          next = discard_if_necessary<T>(iterate());
+          continue;
+        } else {
+          promise.set(Nothing());
+          return;
+        }
+      } else {
+        condition
+          .onAny(defer(pid, [self, this](const Future<bool>&) {
+            if (condition.isReady()) {
+              if (condition.get()) {
+                next = discard_if_necessary<T>(iterate());
+                run();
+              } else {
+                promise.set(Nothing());
+              }
+            } else if (condition.isFailed()) {
+              promise.fail(condition.failure());
+            } else if (condition.isDiscarded()) {
+              promise.discard();
+            }
+          }));
+        return;
+      }
+    }
+
+    next
+      .onAny(defer(pid, [self, this](const Future<T>&) {
+        if (next.isReady()) {
+          run();
+        } else if (next.isFailed()) {
+          promise.fail(next.failure());
+        } else if (next.isDiscarded()) {
+          promise.discard();
+        }
+      }));
+  }
+
+private:
+  const UPID pid;
+  Iterate iterate;
+  Body body;
+  Promise<Nothing> promise;
+  Future<T> next;
+  Future<bool> condition;
+};
+
+} // namespace internal {
+
+
+template <typename Iterate, typename Body>
+Future<Nothing> loop(const UPID& pid, Iterate&& iterate, Body&& body)
+{
+  using T =
+    typename internal::unwrap<typename result_of<Iterate()>::type>::type;
+
+  using Loop = internal::Loop<
+    typename std::decay<Iterate>::type,
+    typename std::decay<Body>::type,
+    T>;
+
+  std::shared_ptr<Loop> loop(
+      new Loop(pid, std::forward<Iterate>(iterate), std::forward<Body>(body)));
+
+  return loop->start();
+}
+
+} // namespace process {
+
+#endif // __PROCESS_LOOP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9b34363e/3rdparty/libprocess/src/tests/loop_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/loop_tests.cpp 
b/3rdparty/libprocess/src/tests/loop_tests.cpp
new file mode 100644
index 0000000..8435ba8
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/loop_tests.cpp
@@ -0,0 +1,121 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <gmock/gmock.h>
+
+#include <atomic>
+
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+#include <process/loop.hpp>
+#include <process/queue.hpp>
+
+using process::Future;
+using process::loop;
+using process::Promise;
+using process::Queue;
+
+
+TEST(LoopTest, Sync)
+{
+  std::atomic_int value = ATOMIC_VAR_INIT(1);
+
+  Future<Nothing> future = loop(
+      [&]() {
+        return value.load();
+      },
+      [](int i) {
+        return i != 0;
+      });
+
+  EXPECT_TRUE(future.isPending());
+
+  value.store(0);
+
+  AWAIT_READY(future);
+}
+
+
+TEST(LoopTest, Async)
+{
+  Queue<int> queue;
+
+  Promise<int> promise1;
+  Promise<bool> promise2;
+
+  Future<Nothing> future = loop(
+      [&]() {
+        return queue.get();
+      },
+      [&](int i) {
+        promise1.set(i);
+        return promise2.future();
+      });
+
+  EXPECT_TRUE(future.isPending());
+
+  queue.put(1);
+
+  AWAIT_EQ(1, promise1.future());
+
+  EXPECT_TRUE(future.isPending());
+
+  promise2.set(false);
+
+  AWAIT_READY(future);
+}
+
+
+TEST(LoopTest, DiscardIterate)
+{
+  Promise<int> promise;
+
+  promise.future().onDiscard([&]() { promise.discard(); });
+
+  Future<Nothing> future = loop(
+      [&]() {
+        return promise.future();
+      },
+      [&](int i) {
+        return false;
+      });
+
+  EXPECT_TRUE(future.isPending());
+
+  future.discard();
+
+  AWAIT_DISCARDED(future);
+  EXPECT_TRUE(promise.future().hasDiscard());
+}
+
+
+TEST(LoopTest, DiscardBody)
+{
+  Promise<bool> promise;
+
+  promise.future().onDiscard([&]() { promise.discard(); });
+
+  Future<Nothing> future = loop(
+      [&]() {
+        return 42;
+      },
+      [&](int i) {
+        return promise.future();
+      });
+
+  EXPECT_TRUE(future.isPending());
+
+  future.discard();
+
+  AWAIT_DISCARDED(future);
+  EXPECT_TRUE(promise.future().hasDiscard());
+}

Reply via email to