Set up recovery code paths of resource provider manager.

This patch adjusts the control flow of the resource provider manager
so that we can in the future make use of persisted resource provider
information. While this patch sets up the needed flow, it does not
implement recovery logic, yet.

Review: https://reviews.apache.org/r/66311/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/169efe6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/169efe6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/169efe6e

Branch: refs/heads/master
Commit: 169efe6e832165d441559a38cd7d31b80d1c84ec
Parents: 2d81d7c
Author: Benjamin Bannier <benjamin.bann...@mesosphere.io>
Authored: Tue May 1 13:09:14 2018 -0700
Committer: Chun-Hung Hsiao <chhs...@mesosphere.io>
Committed: Tue May 1 13:09:14 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 323 ++++++++++++---------
 src/resource_provider/registrar.cpp           |  96 +++---
 src/resource_provider/registrar.hpp           |  18 +-
 src/tests/resource_provider_manager_tests.cpp |  15 +-
 4 files changed, 262 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index 67dbfbe..dfb8e73 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -36,6 +36,7 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/uuid.hpp>
 
@@ -47,6 +48,7 @@
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
+#include "resource_provider/registry.hpp"
 #include "resource_provider/validation.hpp"
 
 namespace http = process::http;
@@ -60,6 +62,8 @@ using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
 
+using mesos::resource_provider::registry::Registry;
+
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -76,10 +80,10 @@ using process::wait;
 
 using process::http::Accepted;
 using process::http::BadRequest;
-using process::http::OK;
 using process::http::MethodNotAllowed;
 using process::http::NotAcceptable;
 using process::http::NotImplemented;
+using process::http::OK;
 using process::http::Pipe;
 using process::http::UnsupportedMediaType;
 
@@ -192,6 +196,11 @@ private:
       ResourceProvider* resourceProvider,
       const Call::UpdatePublishResourcesStatus& update);
 
+  Future<Nothing> recover(
+      const mesos::resource_provider::registry::Registry& registry);
+
+  void initialize() override;
+
   ResourceProviderID newResourceProviderId();
 
   double gaugeSubscribed();
@@ -209,6 +218,9 @@ private:
     PullGauge subscribed;
   };
 
+  Owned<Registrar> registrar;
+  Promise<Nothing> recovered;
+
   Metrics metrics;
 };
 
@@ -216,152 +228,191 @@ private:
 ResourceProviderManagerProcess::ResourceProviderManagerProcess(
     Owned<Registrar> _registrar)
   : ProcessBase(process::ID::generate("resource-provider-manager")),
+    registrar(std::move(_registrar)),
     metrics(*this)
 {
-  CHECK_NOTNULL(_registrar.get());
+  CHECK_NOTNULL(registrar.get());
 }
 
 
-Future<http::Response> ResourceProviderManagerProcess::api(
-    const http::Request& request,
-    const Option<Principal>& principal)
+void ResourceProviderManagerProcess::initialize()
 {
-  if (request.method != "POST") {
-    return MethodNotAllowed({"POST"}, request.method);
-  }
-
-  v1::resource_provider::Call v1Call;
-
-  // TODO(anand): Content type values are case-insensitive.
-  Option<string> contentType = request.headers.get("Content-Type");
-
-  if (contentType.isNone()) {
-    return BadRequest("Expecting 'Content-Type' to be present");
-  }
-
-  if (contentType.get() == APPLICATION_PROTOBUF) {
-    if (!v1Call.ParseFromString(request.body)) {
-      return BadRequest("Failed to parse body into Call protobuf");
-    }
-  } else if (contentType.get() == APPLICATION_JSON) {
-    Try<JSON::Value> value = JSON::parse(request.body);
-    if (value.isError()) {
-      return BadRequest("Failed to parse body into JSON: " + value.error());
-    }
-
-    Try<v1::resource_provider::Call> parse =
-      ::protobuf::parse<v1::resource_provider::Call>(value.get());
-
-    if (parse.isError()) {
-      return BadRequest("Failed to convert JSON into Call protobuf: " +
-                        parse.error());
-    }
-
-    v1Call = parse.get();
-  } else {
-    return UnsupportedMediaType(
-        string("Expecting 'Content-Type' of ") +
-        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
-  }
-
-  Call call = devolve(v1Call);
-
-  Option<Error> error = validate(call);
-  if (error.isSome()) {
-    return BadRequest(
-        "Failed to validate resource_provider::Call: " + error->message);
-  }
-
-  if (call.type() == Call::SUBSCRIBE) {
-    // We default to JSON 'Content-Type' in the response since an empty
-    // 'Accept' header results in all media types considered acceptable.
-    ContentType acceptType = ContentType::JSON;
-
-    if (request.acceptsMediaType(APPLICATION_JSON)) {
-      acceptType = ContentType::JSON;
-    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
-      acceptType = ContentType::PROTOBUF;
-    } else {
-      return NotAcceptable(
-          string("Expecting 'Accept' to allow ") +
-          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
-    }
-
-    if (request.headers.contains("Mesos-Stream-Id")) {
-      return BadRequest(
-          "Subscribe calls should not include the 'Mesos-Stream-Id' header");
-    }
-
-    Pipe pipe;
-    OK ok;
-
-    ok.headers["Content-Type"] = stringify(acceptType);
-    ok.type = http::Response::PIPE;
-    ok.reader = pipe.reader();
-
-    // Generate a stream ID and return it in the response.
-    id::UUID streamId = id::UUID::random();
-    ok.headers["Mesos-Stream-Id"] = streamId.toString();
-
-    HttpConnection http(pipe.writer(), acceptType, streamId);
-    subscribe(http, call.subscribe());
-
-    return ok;
-  }
-
-  if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
-    return BadRequest("Resource provider is not subscribed");
-  }
-
-  ResourceProvider* resourceProvider =
-    resourceProviders.subscribed.at(call.resource_provider_id()).get();
-
-  // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
-  if (!request.headers.contains("Mesos-Stream-Id")) {
-    return BadRequest(
-        "All non-subscribe calls should include to 'Mesos-Stream-Id' header");
-  }
-
-  const string& streamId = request.headers.at("Mesos-Stream-Id");
-  if (streamId != resourceProvider->http.streamId.toString()) {
-    return BadRequest(
-        "The stream ID '" + streamId + "' included in this request "
-        "didn't match the stream ID currently associated with "
-        " resource provider ID " + resourceProvider->info.id().value());
-  }
-
-  switch(call.type()) {
-    case Call::UNKNOWN: {
-      return NotImplemented();
-    }
-
-    case Call::SUBSCRIBE: {
-      // `SUBSCRIBE` call should have been handled above.
-      LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
-    }
+  // Recover the registrar.
+  registrar->recover()
+    .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1))
+    .onAny([](const Future<Nothing>& recovered) {
+      if (!recovered.isReady()) {
+        LOG(FATAL)
+        << "Failed to recover resource provider manager registry: "
+        << recovered;
+      }
+    });
+}
 
-    case Call::UPDATE_OPERATION_STATUS: {
-      updateOperationStatus(
-          resourceProvider,
-          call.update_operation_status());
 
-      return Accepted();
-    }
+Future<Nothing> ResourceProviderManagerProcess::recover(
+    const mesos::resource_provider::registry::Registry& registry)
+{
+  recovered.set(Nothing());
 
-    case Call::UPDATE_STATE: {
-      updateState(resourceProvider, call.update_state());
-      return Accepted();
-    }
+  return Nothing();
+}
 
-    case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
-      updatePublishResourcesStatus(
-          resourceProvider,
-          call.update_publish_resources_status());
-      return Accepted();
-    }
-  }
 
-  UNREACHABLE();
+Future<http::Response> ResourceProviderManagerProcess::api(
+    const http::Request& request,
+    const Option<Principal>& principal)
+{
+  // TODO(bbannier): This implementation does not limit the number of messages
+  // in the actor's inbox which could become large should a big number of
+  // resource providers attempt to subscribe before recovery completed. 
Consider
+  // rejecting requests until the resource provider manager has recovered. This
+  // would likely require implementing retry logic in resource providers.
+  return recovered.future().then(defer(
+      self(), [this, request, principal](const Nothing&) -> http::Response {
+        if (request.method != "POST") {
+          return MethodNotAllowed({"POST"}, request.method);
+        }
+
+        v1::resource_provider::Call v1Call;
+
+        // TODO(anand): Content type values are case-insensitive.
+        Option<string> contentType = request.headers.get("Content-Type");
+
+        if (contentType.isNone()) {
+          return BadRequest("Expecting 'Content-Type' to be present");
+        }
+
+        if (contentType.get() == APPLICATION_PROTOBUF) {
+          if (!v1Call.ParseFromString(request.body)) {
+            return BadRequest("Failed to parse body into Call protobuf");
+          }
+        } else if (contentType.get() == APPLICATION_JSON) {
+          Try<JSON::Value> value = JSON::parse(request.body);
+          if (value.isError()) {
+            return BadRequest(
+                "Failed to parse body into JSON: " + value.error());
+          }
+
+          Try<v1::resource_provider::Call> parse =
+            ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+          if (parse.isError()) {
+            return BadRequest(
+                "Failed to convert JSON into Call protobuf: " + parse.error());
+          }
+
+          v1Call = parse.get();
+        } else {
+          return UnsupportedMediaType(
+              string("Expecting 'Content-Type' of ") + APPLICATION_JSON +
+              " or " + APPLICATION_PROTOBUF);
+        }
+
+        Call call = devolve(v1Call);
+
+        Option<Error> error = validate(call);
+        if (error.isSome()) {
+          return BadRequest(
+              "Failed to validate resource_provider::Call: " + error->message);
+        }
+
+        if (call.type() == Call::SUBSCRIBE) {
+          // We default to JSON 'Content-Type' in the response since an empty
+          // 'Accept' header results in all media types considered acceptable.
+          ContentType acceptType = ContentType::JSON;
+
+          if (request.acceptsMediaType(APPLICATION_JSON)) {
+            acceptType = ContentType::JSON;
+          } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+            acceptType = ContentType::PROTOBUF;
+          } else {
+            return NotAcceptable(
+                string("Expecting 'Accept' to allow ") + "'" +
+                APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+          }
+
+          if (request.headers.contains("Mesos-Stream-Id")) {
+            return BadRequest(
+                "Subscribe calls should not include the 'Mesos-Stream-Id' "
+                "header");
+          }
+
+          Pipe pipe;
+          OK ok;
+
+          ok.headers["Content-Type"] = stringify(acceptType);
+          ok.type = http::Response::PIPE;
+          ok.reader = pipe.reader();
+
+          // Generate a stream ID and return it in the response.
+          id::UUID streamId = id::UUID::random();
+          ok.headers["Mesos-Stream-Id"] = streamId.toString();
+
+          HttpConnection http(pipe.writer(), acceptType, streamId);
+          this->subscribe(http, call.subscribe());
+
+          return std::move(ok);
+        }
+
+        if (!this->resourceProviders.subscribed.contains(
+                call.resource_provider_id())) {
+          return BadRequest("Resource provider is not subscribed");
+        }
+
+        ResourceProvider* resourceProvider =
+          this->resourceProviders.subscribed.at(call.resource_provider_id())
+            .get();
+
+        // This isn't a `SUBSCRIBE` call, so the request should include a 
stream
+        // ID.
+        if (!request.headers.contains("Mesos-Stream-Id")) {
+          return BadRequest(
+              "All non-subscribe calls should include to 'Mesos-Stream-Id' "
+              "header");
+        }
+
+        const string& streamId = request.headers.at("Mesos-Stream-Id");
+        if (streamId != resourceProvider->http.streamId.toString()) {
+          return BadRequest(
+              "The stream ID '" + streamId +
+              "' included in this request "
+              "didn't match the stream ID currently associated with "
+              " resource provider ID " +
+              resourceProvider->info.id().value());
+        }
+
+        switch (call.type()) {
+          case Call::UNKNOWN: {
+            return NotImplemented();
+          }
+
+          case Call::SUBSCRIBE: {
+            // `SUBSCRIBE` call should have been handled above.
+            LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
+          }
+
+          case Call::UPDATE_OPERATION_STATUS: {
+            this->updateOperationStatus(
+                resourceProvider, call.update_operation_status());
+
+            return Accepted();
+          }
+
+          case Call::UPDATE_STATE: {
+            this->updateState(resourceProvider, call.update_state());
+            return Accepted();
+          }
+
+          case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
+            this->updatePublishResourcesStatus(
+                resourceProvider, call.update_publish_resources_status());
+            return Accepted();
+          }
+        }
+
+        UNREACHABLE();
+      }));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp 
b/src/resource_provider/registrar.cpp
index b151e2b..6cc4625 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -92,9 +92,11 @@ Try<Owned<Registrar>> Registrar::create(Owned<Storage> 
storage)
 }
 
 
-Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
+Try<Owned<Registrar>> Registrar::create(
+    master::Registrar* registrar,
+    Registry registry)
 {
-  return new MasterRegistrar(registrar);
+  return new MasterRegistrar(registrar, std::move(registry));
 }
 
 
@@ -150,34 +152,33 @@ class GenericRegistrarProcess : public 
Process<GenericRegistrarProcess>
 public:
   GenericRegistrarProcess(Owned<Storage> storage);
 
-  Future<Nothing> recover();
+  Future<Registry> recover();
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
-  Future<bool> _apply(Owned<Registrar::Operation> operation);
-
   void update();
 
+  void initialize() override;
+
+private:
+  Future<bool> _apply(Owned<Registrar::Operation> operation);
+
   void _update(
       const Future<Option<Variable<Registry>>>& store,
-      const Registry& updatedRegistry,
       deque<Owned<Registrar::Operation>> applied);
 
-private:
+  // We explicitly hold the storage to keep it alive over the
+  // registrar's lifetime.
   Owned<Storage> storage;
 
   // Use fully qualified type for `State` to disambiguate with `State`
   // enumeration in `ProcessBase`.
   mesos::state::protobuf::State state;
 
-  Option<Future<Nothing>> recovered;
-  Option<Registry> registry;
+  Promise<Nothing> recovered;
   Option<Variable<Registry>> variable;
-
   Option<Error> error;
-
   deque<Owned<Registrar::Operation>> operations;
-
   bool updating = false;
 };
 
@@ -191,32 +192,37 @@ 
GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
 }
 
 
-Future<Nothing> GenericRegistrarProcess::recover()
+void GenericRegistrarProcess::initialize()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
-  if (recovered.isNone()) {
-    recovered = state.fetch<Registry>(NAME).then(
-        defer(self(), [this](const Variable<Registry>& recovery) {
-          registry = recovery.get();
-          variable = recovery;
+  CHECK_NONE(variable);
+
+  recovered.associate(state.fetch<Registry>(NAME).then(
+      defer(self(), [this](const Variable<Registry>& recovery) {
+        variable = recovery;
+        return Nothing();
+      })));
+}
 
-          return Nothing();
-        }));
-  }
 
-  return recovered.get();
+Future<Registry> GenericRegistrarProcess::recover()
+{
+  // Prevent discards on the returned `Future` by marking the result as
+  // `undiscardable` so that we control the lifetime of the recovering 
registry.
+  return undiscardable(recovered.future())
+    .then(defer(self(), [this](const Nothing&) {
+      CHECK_SOME(this->variable);
+      return this->variable->get();
+    }));
 }
 
 
 Future<bool> GenericRegistrarProcess::apply(
     Owned<Registrar::Operation> operation)
 {
-  if (recovered.isNone()) {
-    return Failure("Attempted to apply the operation before recovering");
-  }
-
-  return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
+  return undiscardable(recovered.future()).then(
+      defer(self(), &Self::_apply, std::move(operation)));
 }
 
 
@@ -249,8 +255,9 @@ void GenericRegistrarProcess::update()
 
   updating = true;
 
-  CHECK_SOME(registry);
-  Registry updatedRegistry = registry.get();
+  CHECK_SOME(variable);
+
+  Registry updatedRegistry = variable->get();
 
   foreach (Owned<Registrar::Operation>& operation, operations) {
     Try<bool> operationResult = (*operation)(&updatedRegistry);
@@ -272,7 +279,6 @@ void GenericRegistrarProcess::update()
       self(),
       &Self::_update,
       lambda::_1,
-      updatedRegistry,
       std::move(operations)));
 
   operations.clear();
@@ -281,7 +287,6 @@ void GenericRegistrarProcess::update()
 
 void GenericRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
-    const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
 {
   updating = false;
@@ -310,7 +315,6 @@ void GenericRegistrarProcess::_update(
   }
 
   variable = store->get();
-  registry = updatedRegistry;
 
   // Remove the operations.
   while (!applied.empty()) {
@@ -340,7 +344,7 @@ GenericRegistrar::~GenericRegistrar()
 }
 
 
-Future<Nothing> GenericRegistrar::recover()
+Future<Registry> GenericRegistrar::recover()
 {
   return dispatch(process.get(), &GenericRegistrarProcess::recover);
 }
@@ -364,6 +368,8 @@ class MasterRegistrarProcess : public 
Process<MasterRegistrarProcess>
   public:
     AdaptedOperation(Owned<Registrar::Operation> operation);
 
+    Future<registry::Registry> recover();
+
   private:
     Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) 
override;
 
@@ -376,12 +382,17 @@ class MasterRegistrarProcess : public 
Process<MasterRegistrarProcess>
   };
 
 public:
-  explicit MasterRegistrarProcess(master::Registrar* registrar);
+  explicit MasterRegistrarProcess(
+      master::Registrar* registrar,
+      Registry registry);
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
+  Future<registry::Registry> recover() { return registry; }
+
 private:
   master::Registrar* registrar = nullptr;
+  Registry registry;
 };
 
 
@@ -398,9 +409,12 @@ Try<bool> 
MasterRegistrarProcess::AdaptedOperation::perform(
 }
 
 
-MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
+MasterRegistrarProcess::MasterRegistrarProcess(
+    master::Registrar* _registrar,
+    registry::Registry _registry)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    registrar(_registrar) {}
+    registrar(_registrar),
+    registry(std::move(_registry)) {}
 
 
 Future<bool> MasterRegistrarProcess::apply(
@@ -413,8 +427,10 @@ Future<bool> MasterRegistrarProcess::apply(
 }
 
 
-MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
-  : process(new MasterRegistrarProcess(registrar))
+MasterRegistrar::MasterRegistrar(
+    master::Registrar* registrar,
+    registry::Registry registry)
+  : process(new MasterRegistrarProcess(registrar, std::move(registry)))
 {
   spawn(process.get(), false);
 }
@@ -427,9 +443,9 @@ MasterRegistrar::~MasterRegistrar()
 }
 
 
-Future<Nothing> MasterRegistrar::recover()
+Future<Registry> MasterRegistrar::recover()
 {
-  return Nothing();
+  return dispatch(process.get(), &MasterRegistrarProcess::recover);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp 
b/src/resource_provider/registrar.hpp
index 3c10785..ded56e1 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -71,12 +71,16 @@ public:
       process::Owned<state::Storage> storage);
 
   // Create a registry on top of a master's persistent state.
+  //
+  // The created registrar does not take ownership of the passed registrar
+  // which needs to be valid as long as the created registrar is alive.
   static Try<process::Owned<Registrar>> create(
-      mesos::internal::master::Registrar* registrar);
+      mesos::internal::master::Registrar* registrar,
+      registry::Registry registry);
 
   virtual ~Registrar() = default;
 
-  virtual process::Future<Nothing> recover() = 0;
+  virtual process::Future<registry::Registry> recover() = 0;
   virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
 };
 
@@ -115,7 +119,7 @@ public:
 
   ~GenericRegistrar() override;
 
-  process::Future<Nothing> recover() override;
+  process::Future<registry::Registry> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
@@ -130,13 +134,17 @@ class MasterRegistrarProcess;
 class MasterRegistrar : public Registrar
 {
 public:
-  explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar);
+  // The created registrar does not take ownership of the passed registrar
+  // which needs to be valid as long as the created registrar is alive.
+  explicit MasterRegistrar(
+      mesos::internal::master::Registrar* registrar,
+      registry::Registry registry);
 
   ~MasterRegistrar() override;
 
   // This registrar performs no recovery; instead to recover
   // the underlying master registrar needs to be recovered.
-  process::Future<Nothing> recover() override;
+  process::Future<registry::Registry> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp 
b/src/tests/resource_provider_manager_tests.cpp
index 1664073..eb8e4fc 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -841,10 +841,6 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  // Applying operations on a not yet recovered registrar fails.
-  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
-      new AdmitResourceProvider(resourceProviderId))));
-
   AWAIT_READY(registrar.get()->recover());
 
   Future<bool> admitResourceProvider =
@@ -873,15 +869,16 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 
   const MasterInfo masterInfo = protobuf::createMasterInfo({});
 
-  Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar);
+  Future<Registry> registry = masterRegistrar.recover(masterInfo);
+  AWAIT_READY(registry);
+
+  Try<Owned<Registrar>> registrar = Registrar::create(
+      &masterRegistrar,
+      registry->resource_provider_registry());
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  // Applying operations on a not yet recovered registrar fails.
-  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
-      new AdmitResourceProvider(resourceProviderId))));
-
   AWAIT_READY(masterRegistrar.recover(masterInfo));
 
   Future<bool> admitResourceProvider =

Reply via email to