Added the registrar. From: Benjamin Hindman <b...@berkeley.edu> Review: https://reviews.apache.org/r/14383
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7edb0004 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7edb0004 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7edb0004 Branch: refs/heads/master Commit: 7edb0004b974db8ac2c5e9be84e57e7349136de6 Parents: a8310ad Author: Benjamin Mahler <bmah...@twitter.com> Authored: Tue Oct 29 12:08:23 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Tue Oct 29 12:27:01 2013 -0700 ---------------------------------------------------------------------- src/Makefile.am | 5 +- src/master/registrar.cpp | 415 +++++++++++++++++++++++++++++++++++++ src/master/registrar.hpp | 53 +++++ src/master/registry.proto | 2 +- src/tests/registrar_tests.cpp | 154 ++++++++++++++ src/tests/state_tests.cpp | 148 ++++++------- 6 files changed, 701 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 0b32d74..a11c76b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -167,6 +167,7 @@ libmesos_no_3rdparty_la_SOURCES = \ master/master.cpp \ master/registry.hpp \ master/registry.proto \ + master/registrar.cpp \ slave/constants.cpp \ slave/gc.cpp \ slave/monitor.cpp \ @@ -223,6 +224,7 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp \ master/allocator.hpp \ master/constants.hpp master/drf_sorter.hpp master/flags.hpp \ master/hierarchical_allocator_process.hpp \ + master/registrar.hpp \ master/master.hpp master/sorter.hpp \ messages/messages.hpp slave/constants.hpp \ slave/flags.hpp slave/gc.hpp slave/monitor.hpp \ @@ -298,7 +300,7 @@ libmesos_no_3rdparty_la_LIBADD += libstate.la # The final result! lib_LTLIBRARIES += libmesos.la -libmesos_la_SOURCES = $(MESOS_PROTO) # Part of the distribution. +libmesos_la_SOURCES = $(MESOS_PROTO) # Include as part of the distribution. libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION) -shared @@ -781,6 +783,7 @@ mesos_tests_SOURCES = \ tests/paths_tests.cpp \ tests/protobuf_io_tests.cpp \ tests/reaper_tests.cpp \ + tests/registrar_tests.cpp \ tests/resource_offers_tests.cpp \ tests/resources_tests.cpp \ tests/sasl_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/master/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp new file mode 100644 index 0000000..42fe30e --- /dev/null +++ b/src/master/registrar.cpp @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <deque> +#include <string> + +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/future.hpp> +#include <process/process.hpp> + +#include <stout/lambda.hpp> +#include <stout/none.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> + +#include "common/type_utils.hpp" + +#include "master/registrar.hpp" +#include "master/registry.hpp" + +#include "state/protobuf.hpp" + +using mesos::internal::state::protobuf::State; +using mesos::internal::state::protobuf::Variable; + +using process::dispatch; +using process::Future; +using process::Process; +using process::Promise; +using process::spawn; +using process::terminate; +using process::wait; // Necessary on some OS's to disambiguate. + +namespace mesos { +namespace internal { +namespace master { + +class RegistrarProcess : public Process<RegistrarProcess> +{ +public: + RegistrarProcess(State* _state) + : ProcessBase("registrar"), + state(_state) + { + slaves.variable = None(); + slaves.updating = false; + } + + virtual ~RegistrarProcess() {} + + // Registrar implementation. + Future<bool> admit(const SlaveID& id, const SlaveInfo& info); + Future<bool> readmit(const SlaveInfo& info); + Future<bool> remove(const SlaveInfo& info); + +private: + template <typename T> + struct Mutation : process::Promise<bool> + { + virtual Try<T> apply(T t) = 0; + }; + + struct Admit : Mutation<registry::Slaves> + { + Admit(const SlaveID& _id, const SlaveInfo& _info) + : id(_id), info(_info) {} + + virtual Try<registry::Slaves> apply(registry::Slaves slaves) + { + // Check and see if this slave already exists. + foreach (const registry::Slave& slave, slaves.slaves()) { + if (slave.info().id() == id) { + set(false); + return slaves; // No mutation. + } + } + + // Okay, add the slave! + registry::Slave* slave = slaves.add_slaves(); + slave->mutable_info()->CopyFrom(info); + slave->mutable_info()->mutable_id()->MergeFrom(id); + return slaves; + } + + const SlaveID id; + const SlaveInfo info; + }; + + // NOTE: even thought readmission does not mutate the state we model + // it as a mutation so that it is performed in sequence with other + // mutations. + struct Readmit : Mutation<registry::Slaves> + { + Readmit(const SlaveInfo& _info) : info(_info) { CHECK(info.has_id()); } + + virtual Try<registry::Slaves> apply(registry::Slaves slaves) + { + bool found = false; + foreach (const registry::Slave& slave, slaves.slaves()) { + if (slave.info().id() == info.id()) { + set(true); + found = true; + } + } + if (!found) { + set(false); + } + return slaves; + } + + const SlaveInfo info; + }; + + struct Remove : Mutation<registry::Slaves> + { + Remove(const SlaveInfo& _info) : info(_info) { CHECK(info.has_id()); } + + virtual Try<registry::Slaves> apply(registry::Slaves slaves) + { + bool removed = false; + for (int i = 0; i < slaves.slaves().size(); i++) { + const registry::Slave& slave = slaves.slaves(i); + if (slave.info().id() == info.id()) { + for (int j = i + 1; j < slaves.slaves().size(); j++) { + slaves.mutable_slaves()->SwapElements(i, j); + } + slaves.mutable_slaves()->RemoveLast(); + removed = true; + break; + } + } + if (!removed) { + set(false); + } + return slaves; // May or may not have been mutated. + } + + const SlaveInfo info; + }; + + struct { + Option<Variable<registry::Slaves> > variable; + std::deque<Mutation<registry::Slaves>*> mutations; + bool updating; // Used to signify fetching (recovering) or storing. + } slaves; + + // Continuations. + Future<bool> _admit(const SlaveID& id, const SlaveInfo& info); + Future<bool> _readmit(const SlaveInfo& info); + Future<bool> _remove(const SlaveInfo& info); + + // Helper for recovering state (performing fetch). + Future<Nothing> recover(); + void _recover(const Future<Variable<registry::Slaves> >& recovery); + + // Helper for updating state (performing store). + void update(); + Future<bool> _update(const Option<Variable<registry::Slaves> >& variable); + void __update(); + + State* state; + + // Used to compose our operations with recovery. + Promise<Nothing> recovered; +}; + + +Future<Nothing> RegistrarProcess::recover() +{ + LOG(INFO) << "Recovering registrar"; + + // "Recover" the 'slaves' variable by fetching it from the state. + if (slaves.variable.isNone() && !slaves.updating) { + state->fetch<registry::Slaves>("slaves") + .onAny(defer(self(), &Self::_recover, lambda::_1)); + + // TODO(benh): Don't wait forever to recover? + } + + // TODO(benh): Recover other variables too. + + return recovered.future(); +} + + +void RegistrarProcess::_recover( + const Future<Variable<registry::Slaves> >& recovery) +{ + slaves.updating = false; + + CHECK(!recovery.isPending()); + + if (recovery.isFailed() || recovery.isDiscarded()) { + LOG(WARNING) << "Failed to recover registrar: " << recovery.isFailed() + ? recovery.failure() : "future discarded"; + recover(); // Retry! TODO(benh): Don't retry forever? + } else { + LOG(INFO) << "Successfully recovered registrar"; + + // Save the slaves variable. + slaves.variable = recovery.get(); + + // Signal the recovery is complete. + recovered.set(Nothing()); + } +} + + +Future<bool> RegistrarProcess::admit( + const SlaveID& id, + const SlaveInfo& info) +{ + return recover() + .then(defer(self(), &Self::_admit, id, info)); +} + + +Future<bool> RegistrarProcess::_admit( + const SlaveID& id, + const SlaveInfo& info) +{ + CHECK_SOME(slaves.variable); + Mutation<registry::Slaves>* mutation = new Admit(id, info); + slaves.mutations.push_back(mutation); + Future<bool> future = mutation->future(); + if (!slaves.updating) { + update(); + } + return future; +} + + +Future<bool> RegistrarProcess::readmit(const SlaveInfo& info) +{ + return recover() + .then(defer(self(), &Self::_readmit, info)); +} + + +Future<bool> RegistrarProcess::_readmit( + const SlaveInfo& info) +{ + CHECK_SOME(slaves.variable); + + if (!info.has_id()) { + return Future<bool>::failed("Expecting SlaveInfo to have a SlaveID"); + } + + Mutation<registry::Slaves>* mutation = new Readmit(info); + slaves.mutations.push_back(mutation); + Future<bool> future = mutation->future(); + if (!slaves.updating) { + update(); + } + return future; +} + + +Future<bool> RegistrarProcess::remove(const SlaveInfo& info) +{ + return recover() + .then(defer(self(), &Self::_remove, info)); +} + + +Future<bool> RegistrarProcess::_remove( + const SlaveInfo& info) +{ + CHECK_SOME(slaves.variable); + + if (!info.has_id()) { + return Future<bool>::failed("Expecting SlaveInfo to have a SlaveID"); + } + + Mutation<registry::Slaves>* mutation = new Remove(info); + slaves.mutations.push_back(mutation); + Future<bool> future = mutation->future(); + if (!slaves.updating) { + update(); + } + return future; +} + + +void RegistrarProcess::update() +{ + if (!slaves.mutations.empty()) { + CHECK(!slaves.updating); + + slaves.updating = true; + + LOG(INFO) << "Attempting to update 'slaves'"; + + CHECK_SOME(slaves.variable); + + Variable<registry::Slaves> variable = slaves.variable.get(); + + foreach (Mutation<registry::Slaves>* mutation, slaves.mutations) { + Try<registry::Slaves> slaves = mutation->apply(variable.get()); + if (slaves.isError()) { + mutation->fail("Failed to mutate 'slaves': " + slaves.error()); + } else { + Try<Variable<registry::Slaves> > v = variable.mutate(slaves.get()); + if (v.isError()) { + mutation->fail("Failed to mutate 'slaves': " + v.error()); + } else { + variable = v.get(); + } + } + } + + // Perform the store! Save the future so we can associate it with + // the mutations that are part of this update. + Future<bool> future = + state->store(variable).then(defer(self(), &Self::_update, lambda::_1)); + + // TODO(benh): Add a timeout so we don't wait forever. + + // Toggle 'updating' if the store fails or is discarded. + future + .onDiscarded(defer(self(), &Self::__update)) + .onFailed(defer(self(), &Self::__update)); + + // Now associate the store with all the mutations. + while (!slaves.mutations.empty()) { + Mutation<registry::Slaves>* mutation = slaves.mutations.front(); + slaves.mutations.pop_front(); + mutation->associate(future); // No-op if already failed above. + delete mutation; + } + } +} + + +Future<bool> RegistrarProcess::_update( + const Option<Variable<registry::Slaves> >& variable) +{ + slaves.updating = false; + + if (variable.isNone()) { + LOG(WARNING) << "Failed to update 'slaves': version mismatch"; + return Future<bool>::failed("Failed to update 'slaves': version mismatch"); + } + + LOG(INFO) << "Successfully updated 'slaves'"; + + slaves.variable = variable.get(); + + if (!slaves.mutations.empty()) { + update(); + } + + return true; +} + + +void RegistrarProcess::__update() +{ + LOG(WARNING) << "Failed to update 'slaves'"; + slaves.updating = false; +} + + +Registrar::Registrar(State* state) +{ + process = new RegistrarProcess(state); + spawn(process); +} + + +Registrar::~Registrar() +{ + terminate(process); + wait(process); +} + + +Future<bool> Registrar::admit( + const SlaveID& id, + const SlaveInfo& info) +{ + return dispatch(process, &RegistrarProcess::admit, id, info); +} + + +Future<bool> Registrar::readmit(const SlaveInfo& info) +{ + return dispatch(process, &RegistrarProcess::readmit, info); +} + + +Future<bool> Registrar::remove(const SlaveInfo& info) +{ + return dispatch(process, &RegistrarProcess::remove, info); +} + +} // namespace master { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/master/registrar.hpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp new file mode 100644 index 0000000..5742c36 --- /dev/null +++ b/src/master/registrar.hpp @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MASTER_REGISTRAR_HPP__ +#define __MASTER_REGISTRAR_HPP__ + +#include <mesos/mesos.hpp> + +#include <process/future.hpp> + +#include "state/protobuf.hpp" + +namespace mesos { +namespace internal { +namespace master { + +// Forward declaration. +class RegistrarProcess; + +class Registrar +{ +public: + Registrar(state::protobuf::State* state); + ~Registrar(); + + process::Future<bool> admit(const SlaveID& id, const SlaveInfo& info); + process::Future<bool> readmit(const SlaveInfo& info); + process::Future<bool> remove(const SlaveInfo& info); + +private: + RegistrarProcess* process; +}; + +} // namespace master { +} // namespace internal { +} // namespace mesos { + +#endif // __MASTER_REGISTRAR_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/master/registry.proto ---------------------------------------------------------------------- diff --git a/src/master/registry.proto b/src/master/registry.proto index 877bfa1..bd85099 100644 --- a/src/master/registry.proto +++ b/src/master/registry.proto @@ -27,6 +27,6 @@ message Slave { } -message Registry { +message Slaves { repeated Slave slaves = 1; } http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/tests/registrar_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp new file mode 100644 index 0000000..51975f5 --- /dev/null +++ b/src/tests/registrar_tests.cpp @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <map> +#include <string> + +#include <process/gmock.hpp> +#include <process/gtest.hpp> +#include <process/pid.hpp> +#include <process/process.hpp> + +#include "master/registrar.hpp" + +#include "state/leveldb.hpp" +#include "state/protobuf.hpp" +#include "state/storage.hpp" + +using namespace mesos; +using namespace mesos::internal; + +using namespace process; + +using std::map; +using std::string; + +using testing::_; +using testing::Eq; + +namespace mesos { +namespace internal { +namespace master { + +class RegistrarTest : public ::testing::Test +{ +public: + RegistrarTest() + : storage(NULL), + state(NULL), + path(os::getcwd() + "/.state") {} + +protected: + virtual void SetUp() + { + os::rmdir(path); + storage = new state::LevelDBStorage(path); + state = new state::protobuf::State(storage); + } + + virtual void TearDown() + { + delete state; + delete storage; + os::rmdir(path); + } + + state::Storage* storage; + state::protobuf::State* state; + +private: + const std::string path; +}; + + +TEST_F(RegistrarTest, admit) +{ + Registrar registrar(state); + + SlaveID id1; + id1.set_value("1"); + + SlaveInfo info1; + info1.set_hostname("localhost"); + info1.mutable_id()->CopyFrom(id1); + + AWAIT_EQ(true, registrar.admit(id1, info1)); + AWAIT_EQ(false, registrar.admit(id1, info1)); +} + + +TEST_F(RegistrarTest, readmit) +{ + Registrar registrar(state); + + SlaveID id1; + id1.set_value("1"); + + SlaveInfo info1; + info1.set_hostname("localhost"); + info1.mutable_id()->CopyFrom(id1); + + SlaveID id2; + id2.set_value("2"); + + SlaveInfo info2; + info2.set_hostname("localhost"); + info2.mutable_id()->CopyFrom(id2); + + AWAIT_EQ(true, registrar.admit(id1, info1)); + + AWAIT_EQ(true, registrar.readmit(info1)); + + AWAIT_EQ(false, registrar.readmit(info2)); +} + + +TEST_F(RegistrarTest, remove) +{ + Registrar registrar(state); + + SlaveID id1; + id1.set_value("1"); + + SlaveInfo info1; + info1.set_hostname("localhost"); + info1.mutable_id()->CopyFrom(id1); + + SlaveID id2; + id2.set_value("2"); + + SlaveInfo info2; + info2.set_hostname("localhost"); + info2.mutable_id()->CopyFrom(id2); + + AWAIT_EQ(true, registrar.admit(id1, info1)); + + AWAIT_EQ(true, registrar.admit(id2, info2)); + + AWAIT_EQ(true, registrar.remove(info1)); + + AWAIT_EQ(false, registrar.remove(info1)); + + AWAIT_EQ(true, registrar.admit(id1, info1)); + + AWAIT_EQ(true, registrar.remove(info2)); +} + +} // namespace master { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/tests/state_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp index efc7315..03c5388 100644 --- a/src/tests/state_tests.cpp +++ b/src/tests/state_tests.cpp @@ -51,7 +51,7 @@ using namespace mesos::internal; using namespace process; -using mesos::internal::registry::Registry; +using mesos::internal::registry::Slaves; using mesos::internal::registry::Slave; using state::LevelDBStorage; @@ -66,50 +66,50 @@ using state::protobuf::Variable; void FetchAndStoreAndFetch(State* state) { - Future<Variable<Registry> > future1 = state->fetch<Registry>("registry"); + Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); - Variable<Registry> variable = future1.get(); + Variable<Slaves> variable = future1.get(); - Registry registry1 = variable.get(); - EXPECT_TRUE(registry1.slaves().size() == 0); + Slaves slaves1 = variable.get(); + EXPECT_TRUE(slaves1.slaves().size() == 0); - Slave* slave = registry1.add_slaves(); + Slave* slave = slaves1.add_slaves(); slave->mutable_info()->set_hostname("localhost"); - variable = variable.mutate(registry1); + variable = variable.mutate(slaves1); - Future<Option<Variable<Registry> > > future2 = state->store(variable); + Future<Option<Variable<Slaves> > > future2 = state->store(variable); AWAIT_READY(future2); ASSERT_SOME(future2.get()); - future1 = state->fetch<Registry>("registry"); + future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); variable = future1.get(); - Registry registry2 = variable.get(); - ASSERT_TRUE(registry2.slaves().size() == 1); - EXPECT_EQ("localhost", registry2.slaves(0).info().hostname()); + Slaves slaves2 = variable.get(); + ASSERT_TRUE(slaves2.slaves().size() == 1); + EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname()); } void FetchAndStoreAndStoreAndFetch(State* state) { - Future<Variable<Registry> > future1 = state->fetch<Registry>("registry"); + Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); - Variable<Registry> variable = future1.get(); + Variable<Slaves> variable = future1.get(); - Registry registry1 = variable.get(); - EXPECT_TRUE(registry1.slaves().size() == 0); + Slaves slaves1 = variable.get(); + EXPECT_TRUE(slaves1.slaves().size() == 0); - Slave* slave = registry1.add_slaves(); + Slave* slave = slaves1.add_slaves(); slave->mutable_info()->set_hostname("localhost"); - variable = variable.mutate(registry1); + variable = variable.mutate(slaves1); - Future<Option<Variable<Registry> > > future2 = state->store(variable); + Future<Option<Variable<Slaves> > > future2 = state->store(variable); AWAIT_READY(future2); ASSERT_SOME(future2.get()); @@ -119,75 +119,75 @@ void FetchAndStoreAndStoreAndFetch(State* state) AWAIT_READY(future2); ASSERT_SOME(future2.get()); - future1 = state->fetch<Registry>("registry"); + future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); variable = future1.get(); - Registry registry2 = variable.get(); - ASSERT_TRUE(registry2.slaves().size() == 1); - EXPECT_EQ("localhost", registry2.slaves(0).info().hostname()); + Slaves slaves2 = variable.get(); + ASSERT_TRUE(slaves2.slaves().size() == 1); + EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname()); } void FetchAndStoreAndStoreFailAndFetch(State* state) { - Future<Variable<Registry> > future1 = state->fetch<Registry>("registry"); + Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); - Variable<Registry> variable1 = future1.get(); + Variable<Slaves> variable1 = future1.get(); - Registry registry1 = variable1.get(); - EXPECT_TRUE(registry1.slaves().size() == 0); + Slaves slaves1 = variable1.get(); + EXPECT_TRUE(slaves1.slaves().size() == 0); - Slave* slave1 = registry1.add_slaves(); + Slave* slave1 = slaves1.add_slaves(); slave1->mutable_info()->set_hostname("localhost1"); - Variable<Registry> variable2 = variable1.mutate(registry1); + Variable<Slaves> variable2 = variable1.mutate(slaves1); - Future<Option<Variable<Registry> > > future2 = state->store(variable2); + Future<Option<Variable<Slaves> > > future2 = state->store(variable2); AWAIT_READY(future2); ASSERT_SOME(future2.get()); - Registry registry2 = variable1.get(); - EXPECT_TRUE(registry2.slaves().size() == 0); + Slaves slaves2 = variable1.get(); + EXPECT_TRUE(slaves2.slaves().size() == 0); - Slave* slave2 = registry2.add_slaves(); + Slave* slave2 = slaves2.add_slaves(); slave2->mutable_info()->set_hostname("localhost2"); - variable2 = variable1.mutate(registry2); + variable2 = variable1.mutate(slaves2); future2 = state->store(variable2); AWAIT_READY(future2); EXPECT_TRUE(future2.get().isNone()); - future1 = state->fetch<Registry>("registry"); + future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); variable1 = future1.get(); - registry1 = variable1.get(); - ASSERT_TRUE(registry1.slaves().size() == 1); - EXPECT_EQ("localhost1", registry1.slaves(0).info().hostname()); + slaves1 = variable1.get(); + ASSERT_TRUE(slaves1.slaves().size() == 1); + EXPECT_EQ("localhost1", slaves1.slaves(0).info().hostname()); } void FetchAndStoreAndExpungeAndFetch(State* state) { - Future<Variable<Registry> > future1 = state->fetch<Registry>("registry"); + Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); - Variable<Registry> variable = future1.get(); + Variable<Slaves> variable = future1.get(); - Registry registry1 = variable.get(); - EXPECT_TRUE(registry1.slaves().size() == 0); + Slaves slaves1 = variable.get(); + EXPECT_TRUE(slaves1.slaves().size() == 0); - Slave* slave = registry1.add_slaves(); + Slave* slave = slaves1.add_slaves(); slave->mutable_info()->set_hostname("localhost"); - variable = variable.mutate(registry1); + variable = variable.mutate(slaves1); - Future<Option<Variable<Registry> > > future2 = state->store(variable); + Future<Option<Variable<Slaves> > > future2 = state->store(variable); AWAIT_READY(future2); ASSERT_SOME(future2.get()); @@ -197,32 +197,32 @@ void FetchAndStoreAndExpungeAndFetch(State* state) AWAIT_READY(future3); ASSERT_TRUE(future3.get()); - future1 = state->fetch<Registry>("registry"); + future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); variable = future1.get(); - Registry registry2 = variable.get(); - ASSERT_EQ(0, registry2.slaves().size()); + Slaves slaves2 = variable.get(); + ASSERT_EQ(0, slaves2.slaves().size()); } void FetchAndStoreAndExpungeAndExpunge(State* state) { - Future<Variable<Registry> > future1 = state->fetch<Registry>("registry"); + Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); - Variable<Registry> variable = future1.get(); + Variable<Slaves> variable = future1.get(); - Registry registry1 = variable.get(); - EXPECT_TRUE(registry1.slaves().size() == 0); + Slaves slaves1 = variable.get(); + EXPECT_TRUE(slaves1.slaves().size() == 0); - Slave* slave = registry1.add_slaves(); + Slave* slave = slaves1.add_slaves(); slave->mutable_info()->set_hostname("localhost"); - variable = variable.mutate(registry1); + variable = variable.mutate(slaves1); - Future<Option<Variable<Registry> > > future2 = state->store(variable); + Future<Option<Variable<Slaves> > > future2 = state->store(variable); AWAIT_READY(future2); ASSERT_SOME(future2.get()); @@ -240,20 +240,20 @@ void FetchAndStoreAndExpungeAndExpunge(State* state) void FetchAndStoreAndExpungeAndStoreAndFetch(State* state) { - Future<Variable<Registry> > future1 = state->fetch<Registry>("registry"); + Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); - Variable<Registry> variable = future1.get(); + Variable<Slaves> variable = future1.get(); - Registry registry1 = variable.get(); - EXPECT_TRUE(registry1.slaves().size() == 0); + Slaves slaves1 = variable.get(); + EXPECT_TRUE(slaves1.slaves().size() == 0); - Slave* slave = registry1.add_slaves(); + Slave* slave = slaves1.add_slaves(); slave->mutable_info()->set_hostname("localhost"); - variable = variable.mutate(registry1); + variable = variable.mutate(slaves1); - Future<Option<Variable<Registry> > > future2 = state->store(variable); + Future<Option<Variable<Slaves> > > future2 = state->store(variable); AWAIT_READY(future2); ASSERT_SOME(future2.get()); @@ -267,40 +267,40 @@ void FetchAndStoreAndExpungeAndStoreAndFetch(State* state) AWAIT_READY(future2); ASSERT_SOME(future2.get()); - future1 = state->fetch<Registry>("registry"); + future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); variable = future1.get(); - Registry registry2 = variable.get(); - ASSERT_TRUE(registry2.slaves().size() == 1); - EXPECT_EQ("localhost", registry2.slaves(0).info().hostname()); + Slaves slaves2 = variable.get(); + ASSERT_TRUE(slaves2.slaves().size() == 1); + EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname()); } void Names(State* state) { - Future<Variable<Registry> > future1 = state->fetch<Registry>("registry"); + Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves"); AWAIT_READY(future1); - Variable<Registry> variable = future1.get(); + Variable<Slaves> variable = future1.get(); - Registry registry1 = variable.get(); - EXPECT_TRUE(registry1.slaves().size() == 0); + Slaves slaves1 = variable.get(); + EXPECT_TRUE(slaves1.slaves().size() == 0); - Slave* slave = registry1.add_slaves(); + Slave* slave = slaves1.add_slaves(); slave->mutable_info()->set_hostname("localhost"); - variable = variable.mutate(registry1); + variable = variable.mutate(slaves1); - Future<Option<Variable<Registry> > > future2 = state->store(variable); + Future<Option<Variable<Slaves> > > future2 = state->store(variable); AWAIT_READY(future2); ASSERT_SOME(future2.get()); Future<std::vector<std::string> > names = state->names(); AWAIT_READY(names); ASSERT_TRUE(names.get().size() == 1); - EXPECT_EQ("registry", names.get()[0]); + EXPECT_EQ("slaves", names.get()[0]); }