Updated detector to return MasterInfo instead of PID. Review: https://reviews.apache.org/r/17574
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26b156e3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26b156e3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26b156e3 Branch: refs/heads/master Commit: 26b156e33f281f5b342daefa0b8c327eb97d6647 Parents: 6f4a21f Author: Vinod Kone <vi...@twitter.com> Authored: Thu Jan 30 14:34:35 2014 -0800 Committer: Vinod Kone <vi...@twitter.com> Committed: Mon Feb 10 22:34:43 2014 -0800 ---------------------------------------------------------------------- src/cli/resolve.cpp | 16 +- src/local/local.cpp | 5 +- src/master/detector.cpp | 99 ++++++++----- src/master/detector.hpp | 37 +++-- src/master/http.cpp | 9 +- src/master/main.cpp | 3 +- src/master/master.cpp | 94 ++++++------ src/master/master.hpp | 18 ++- src/sched/sched.cpp | 18 ++- src/slave/slave.cpp | 18 ++- src/slave/slave.hpp | 2 +- src/tests/balloon_framework_test.sh | 2 +- src/tests/cluster.hpp | 4 +- src/tests/master_contender_detector_tests.cpp | 164 +++++++++++---------- 14 files changed, 280 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/cli/resolve.cpp ---------------------------------------------------------------------- diff --git a/src/cli/resolve.cpp b/src/cli/resolve.cpp index 0ebb0c6..9725e22 100644 --- a/src/cli/resolve.cpp +++ b/src/cli/resolve.cpp @@ -31,6 +31,8 @@ #include "master/detector.hpp" +#include "messages/messages.hpp" + using namespace mesos; using namespace mesos::internal; @@ -124,25 +126,25 @@ int main(int argc, char** argv) return -1; } - Future<Option<UPID> > pid = detector.get()->detect(); + Future<Option<MasterInfo> > masterInfo = detector.get()->detect(); - if (!pid.await(timeout)) { + if (!masterInfo.await(timeout)) { cerr << "Failed to detect master from '" << master.get() << "' within " << timeout << endl; return -1; } else { - CHECK(!pid.isDiscarded()); + CHECK(!masterInfo.isDiscarded()); - if (pid.isFailed()) { + if (masterInfo.isFailed()) { cerr << "Failed to detect master from '" << master.get() - << "': " << pid.failure() << endl; + << "': " << masterInfo.failure() << endl; return -1; } } // The future is not satisfied unless the result is Some. - CHECK_SOME(pid.get()); - cout << strings::remove(pid.get().get(), "master@") << endl; + CHECK_SOME(masterInfo.get()); + cout << strings::remove(masterInfo.get().get().pid(), "master@") << endl; return 0; } http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/local/local.cpp ---------------------------------------------------------------------- diff --git a/src/local/local.cpp b/src/local/local.cpp index 83a7f91..e650de9 100644 --- a/src/local/local.cpp +++ b/src/local/local.cpp @@ -25,6 +25,8 @@ #include <stout/path.hpp> #include <stout/strings.hpp> +#include "common/protobuf_utils.hpp" + #include "local.hpp" #include "logging/flags.hpp" @@ -131,7 +133,8 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator) detector = new StandaloneMasterDetector(); master = new Master(_allocator, registrar, files, contender, detector, flags); - detector->appoint(master->self()); + + detector->appoint(master->info()); } PID<Master> pid = process::spawn(master); http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/master/detector.cpp ---------------------------------------------------------------------- diff --git a/src/master/detector.cpp b/src/master/detector.cpp index 2b169c5..7e10433 100644 --- a/src/master/detector.cpp +++ b/src/master/detector.cpp @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -23,14 +24,18 @@ #include <process/dispatch.hpp> #include <process/future.hpp> #include <process/logging.hpp> +#include <process/pid.hpp> #include <process/process.hpp> #include <stout/duration.hpp> #include <stout/foreach.hpp> #include <stout/lambda.hpp> +#include "common/protobuf_utils.hpp" + #include "master/constants.hpp" #include "master/detector.hpp" +#include "master/master.hpp" #include "messages/messages.hpp" @@ -49,21 +54,22 @@ namespace internal { const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10); + class StandaloneMasterDetectorProcess : public Process<StandaloneMasterDetectorProcess> { public: StandaloneMasterDetectorProcess() {} - StandaloneMasterDetectorProcess(const UPID& _leader) + StandaloneMasterDetectorProcess(const MasterInfo& _leader) : leader(_leader) {} ~StandaloneMasterDetectorProcess(); - void appoint(const Option<UPID>& leader); - Future<Option<UPID> > detect(const Option<UPID>& previous = None()); + void appoint(const Option<MasterInfo>& leader); + Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous = None()); private: - Option<UPID> leader; // The appointed master. - set<Promise<Option<UPID> >*> promises; + Option<MasterInfo> leader; // The appointed master. + set<Promise<Option<MasterInfo> >*> promises; }; @@ -76,7 +82,7 @@ public: ~ZooKeeperMasterDetectorProcess(); virtual void initialize(); - Future<Option<UPID> > detect(const Option<UPID>& previous); + Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous); private: // Invoked when the group leadership has changed. @@ -89,8 +95,8 @@ private: LeaderDetector detector; // The leading Master. - Option<UPID> leader; - set<Promise<Option<UPID> >*> promises; + Option<MasterInfo> leader; + set<Promise<Option<MasterInfo> >*> promises; // Potential non-retryable error. Option<Error> error; @@ -131,7 +137,7 @@ Try<MasterDetector*> MasterDetector::create(const string& master) "Failed to parse '" + master + "'"); } - return new StandaloneMasterDetector(pid); + return new StandaloneMasterDetector(protobuf::createMasterInfo(pid)); } @@ -140,7 +146,7 @@ MasterDetector::~MasterDetector() {} StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess() { - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->future().discard(); delete promise; } @@ -148,12 +154,11 @@ StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess() } -void StandaloneMasterDetectorProcess::appoint( - const Option<process::UPID>& _leader) +void StandaloneMasterDetectorProcess::appoint(const Option<MasterInfo>& _leader) { leader = _leader; - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->set(leader); delete promise; } @@ -161,14 +166,14 @@ void StandaloneMasterDetectorProcess::appoint( } -Future<Option<UPID> > StandaloneMasterDetectorProcess::detect( - const Option<UPID>& previous) +Future<Option<MasterInfo> > StandaloneMasterDetectorProcess::detect( + const Option<MasterInfo>& previous) { if (leader != previous) { return leader; } - Promise<Option<UPID> >* promise = new Promise<Option<UPID> >(); + Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >(); promises.insert(promise); return promise->future(); } @@ -181,13 +186,22 @@ StandaloneMasterDetector::StandaloneMasterDetector() } -StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader) +StandaloneMasterDetector::StandaloneMasterDetector(const MasterInfo& leader) { process = new StandaloneMasterDetectorProcess(leader); spawn(process); } +StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader) +{ + process = + new StandaloneMasterDetectorProcess(protobuf::createMasterInfo(leader)); + + spawn(process); +} + + StandaloneMasterDetector::~StandaloneMasterDetector() { terminate(process); @@ -196,14 +210,22 @@ StandaloneMasterDetector::~StandaloneMasterDetector() } -void StandaloneMasterDetector::appoint(const Option<process::UPID>& leader) +void StandaloneMasterDetector::appoint(const Option<MasterInfo>& leader) +{ + dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader); +} + + +void StandaloneMasterDetector::appoint(const UPID& leader) { - return dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader); + dispatch(process, + &StandaloneMasterDetectorProcess::appoint, + protobuf::createMasterInfo(leader)); } -Future<Option<UPID> > StandaloneMasterDetector::detect( - const Option<UPID>& previous) +Future<Option<MasterInfo> > StandaloneMasterDetector::detect( + const Option<MasterInfo>& previous) { return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous); } @@ -230,7 +252,7 @@ ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess( ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess() { - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->future().discard(); delete promise; } @@ -245,8 +267,8 @@ void ZooKeeperMasterDetectorProcess::initialize() } -Future<Option<UPID> > ZooKeeperMasterDetectorProcess::detect( - const Option<UPID>& previous) +Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect( + const Option<MasterInfo>& previous) { // Return immediately if the detector is no longer operational due // to a non-retryable error. @@ -258,7 +280,7 @@ Future<Option<UPID> > ZooKeeperMasterDetectorProcess::detect( return leader; } - Promise<Option<UPID> >* promise = new Promise<Option<UPID> >(); + Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >(); promises.insert(promise); return promise->future(); } @@ -277,7 +299,7 @@ void ZooKeeperMasterDetectorProcess::detected( // will directly fail as a result. error = Error(_leader.failure()); leader = None(); - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->fail(_leader.failure()); delete promise; } @@ -288,7 +310,7 @@ void ZooKeeperMasterDetectorProcess::detected( if (_leader.get().isNone()) { leader = None(); - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->set(leader); delete promise; } @@ -313,7 +335,7 @@ void ZooKeeperMasterDetectorProcess::fetched( if (data.isFailed()) { leader = None(); - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->fail(data.failure()); delete promise; } @@ -325,22 +347,26 @@ void ZooKeeperMasterDetectorProcess::fetched( // leader for subsequent requests. Option<string> label = membership.label(); if (label.isNone()) { - leader = UPID(data.get()); + // If we are here it means some masters are still creating znodes + // with the old format. + UPID pid = UPID(data.get()); + LOG(WARNING) << "Leading master " << pid << " has data in old format"; + leader = protobuf::createMasterInfo(pid); } else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) { MasterInfo info; if (!info.ParseFromString(data.get())) { leader = None(); - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->fail("Failed to parse data into MasterInfo"); delete promise; } promises.clear(); return; } - leader = UPID(info.pid()); + leader = info; } else { leader = None(); - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->fail("Failed to parse data of unknown label " + label.get()); delete promise; } @@ -348,9 +374,10 @@ void ZooKeeperMasterDetectorProcess::fetched( return; } - LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected"; + LOG(INFO) << "A new leading master (UPID=" + << UPID(leader.get().pid()) << ") is detected"; - foreach (Promise<Option<UPID> >* promise, promises) { + foreach (Promise<Option<MasterInfo> >* promise, promises) { promise->set(leader); delete promise; } @@ -380,8 +407,8 @@ ZooKeeperMasterDetector::~ZooKeeperMasterDetector() } -Future<Option<UPID> > ZooKeeperMasterDetector::detect( - const Option<UPID>& previous) +Future<Option<MasterInfo> > ZooKeeperMasterDetector::detect( + const Option<MasterInfo>& previous) { return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous); } http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/master/detector.hpp ---------------------------------------------------------------------- diff --git a/src/master/detector.hpp b/src/master/detector.hpp index 277c9d9..533027a 100644 --- a/src/master/detector.hpp +++ b/src/master/detector.hpp @@ -19,13 +19,17 @@ #ifndef __MASTER_DETECTOR_HPP__ #define __MASTER_DETECTOR_HPP__ +#include <string> + #include <process/future.hpp> #include <process/owned.hpp> -#include <process/pid.hpp> #include <stout/option.hpp> +#include <stout/stringify.hpp> #include <stout/try.hpp> +#include "messages/messages.hpp" + #include "zookeeper/detector.hpp" #include "zookeeper/group.hpp" #include "zookeeper/url.hpp" @@ -55,11 +59,11 @@ public: static Try<MasterDetector*> create(const std::string& master); virtual ~MasterDetector() = 0; - // Returns some PID after an election has occurred and the elected - // PID is different than that specified (if any), or NONE if an - // election occurs and no PID is elected (e.g., all PIDs are lost). - // A failed future is returned if the detector is unable to detect - // the leading master due to a non-retryable error. + // Returns MasterInfo after an election has occurred and the elected + // master is different than that specified (if any), or NONE if an + // election occurs and no master is elected (e.g., all masters are + // lost). A failed future is returned if the detector is unable to + // detect the leading master due to a non-retryable error. // Note that the detector transparently tries to recover from // retryable errors. // The future is never discarded unless it stays pending when the @@ -68,8 +72,8 @@ public: // The 'previous' result (if any) should be passed back if this // method is called repeatedly so the detector only returns when it // gets a different result. - virtual process::Future<Option<process::UPID> > detect( - const Option<process::UPID>& previous = None()) = 0; + virtual process::Future<Option<MasterInfo> > detect( + const Option<MasterInfo>& previous = None()) = 0; }; @@ -82,14 +86,21 @@ public: StandaloneMasterDetector(); // Use this constructor if the leader is known beforehand so it is // unnecessary to call 'appoint()' separately. + StandaloneMasterDetector(const MasterInfo& leader); + + // Same as above but takes UPID as the parameter. StandaloneMasterDetector(const process::UPID& leader); + virtual ~StandaloneMasterDetector(); // Appoint the leading master so it can be *detected*. - void appoint(const Option<process::UPID>& leader); + void appoint(const Option<MasterInfo>& leader); + + // Same as above but takes 'UPID' as the parameter. + void appoint(const process::UPID& leader); - virtual process::Future<Option<process::UPID> > detect( - const Option<process::UPID>& previous = None()); + virtual process::Future<Option<MasterInfo> > detect( + const Option<MasterInfo>& previous = None()); private: StandaloneMasterDetectorProcess* process; @@ -110,8 +121,8 @@ public: // The detector transparently tries to recover from retryable // errors until the group session expires, in which case the Future // returns None. - virtual process::Future<Option<process::UPID> > detect( - const Option<process::UPID>& previous = None()); + virtual process::Future<Option<MasterInfo> > detect( + const Option<MasterInfo>& previous = None()); private: ZooKeeperMasterDetectorProcess* process; http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index e1c3d65..64eaf07 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -302,7 +302,8 @@ Future<Response> Master::Http::redirect(const Request& request) LOG(INFO) << "HTTP request for '" << request.path << "'"; // If there's no leader, redirect to this master's base url. - UPID pid = master.leader.isSome() ? master.leader.get() : master.self(); + UPID pid = master.leader.isSome() + ? UPID(master.leader.get().pid()) : master.self(); Try<string> hostname = net::getHostname(pid.ip); if (hostname.isError()) { @@ -407,9 +408,9 @@ Future<Response> Master::Http::state(const Request& request) object.values["build_time"] = build::TIME; object.values["build_user"] = build::USER; object.values["start_time"] = master.startTime.secs(); - object.values["id"] = master.info.id(); + object.values["id"] = master.info().id(); object.values["pid"] = string(master.self()); - object.values["hostname"] = master.info.hostname(); + object.values["hostname"] = master.info().hostname(); object.values["activated_slaves"] = master.slaves.size(); object.values["deactivated_slaves"] = master.deactivatedSlaves.size(); object.values["staged_tasks"] = master.stats.tasks[TASK_STAGING]; @@ -424,7 +425,7 @@ Future<Response> Master::Http::state(const Request& request) } if (master.leader.isSome()) { - object.values["leader"] = string(master.leader.get()); + object.values["leader"] = master.leader.get().pid(); } if (master.flags.log_dir.isSome()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/master/main.cpp ---------------------------------------------------------------------- diff --git a/src/master/main.cpp b/src/master/main.cpp index 4f1859f..c18c305 100644 --- a/src/master/main.cpp +++ b/src/master/main.cpp @@ -29,6 +29,7 @@ #include <stout/try.hpp> #include "common/build.hpp" +#include "common/protobuf_utils.hpp" #include "logging/flags.hpp" #include "logging/logging.hpp" @@ -180,7 +181,7 @@ int main(int argc, char** argv) if (zk == "") { // It means we are using the standalone detector so we need to // appoint this Master as the leader. - dynamic_cast<StandaloneMasterDetector*>(detector)->appoint(master->self()); + dynamic_cast<StandaloneMasterDetector*>(detector)->appoint(master->info()); } process::spawn(master); http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 1af4558..a4e1b1f 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -202,7 +202,44 @@ Master::Master( files(_files), contender(_contender), detector(_detector), - completedFrameworks(MAX_COMPLETED_FRAMEWORKS) {} + completedFrameworks(MAX_COMPLETED_FRAMEWORKS) +{ + // NOTE: We populate 'info_' here instead of inside 'initialize()' + // because 'StandaloneMasterDetector' needs access to the info. + + // The master ID is currently comprised of the current date, the IP + // address and port from self() and the OS PID. + Try<string> id = + strings::format("%s-%u-%u-%d", DateUtils::currentDate(), + self().ip, self().port, getpid()); + + CHECK(!id.isError()) << id.error(); + + info_.set_id(id.get()); + info_.set_ip(self().ip); + info_.set_port(self().port); + info_.set_pid(self()); + + // Determine our hostname or use the hostname provided. + string hostname; + + if (flags.hostname.isNone()) { + Try<string> result = net::getHostname(self().ip); + + if (result.isError()) { + LOG(FATAL) << "Failed to get hostname: " << result.error(); + } + + hostname = result.get(); + } else { + hostname = flags.hostname.get(); + } + + info_.set_hostname(hostname); + + LOG(INFO) << "Master ID: " << info_.id() + << " Hostname: " << info_.hostname(); +} Master::~Master() @@ -284,39 +321,6 @@ void Master::initialize() { LOG(INFO) << "Master started on " << string(self()).substr(7); - // The master ID is currently comprised of the current date, the IP - // address and port from self() and the OS PID. - Try<string> id = - strings::format("%s-%u-%u-%d", DateUtils::currentDate(), - self().ip, self().port, getpid()); - - CHECK(!id.isError()) << id.error(); - - info.set_id(id.get()); - info.set_ip(self().ip); - info.set_port(self().port); - info.set_pid(self()); - - // Determine our hostname or use the hostname provided. - string hostname; - - if (flags.hostname.isNone()) { - Try<string> result = net::getHostname(self().ip); - - if (result.isError()) { - LOG(FATAL) << "Failed to get hostname: " << result.error(); - } - - hostname = result.get(); - } else { - hostname = flags.hostname.get(); - } - - info.set_hostname(hostname); - - LOG(INFO) << "Master ID: " << info.id() - << " Hostname: " << info.hostname(); - if (flags.authenticate) { LOG(INFO) << "Master only allowing authenticated frameworks to register!"; @@ -567,7 +571,7 @@ void Master::initialize() } } - contender->initialize(info); + contender->initialize(info_); // Start contending to be a leading master and detecting the current // leader. @@ -741,7 +745,7 @@ void Master::lostCandidacy(const Future<Nothing>& lost) } -void Master::detected(const Future<Option<UPID> >& _leader) +void Master::detected(const Future<Option<MasterInfo> >& _leader) { CHECK(!_leader.isDiscarded()); @@ -754,7 +758,9 @@ void Master::detected(const Future<Option<UPID> >& _leader) leader = _leader.get(); LOG(INFO) << "The newly elected leader is " - << (leader.isSome() ? stringify(leader.get()) : "None"); + << (leader.isSome() + ? (leader.get().pid() + " with id " + leader.get().id()) + : "None"); if (wasElected && !elected()) { EXIT(1) << "Lost leadership... committing suicide!"; @@ -818,7 +824,7 @@ void Master::registerFramework( << ") already registered, resending acknowledgement"; FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id); - message.mutable_master_info()->MergeFrom(info); + message.mutable_master_info()->MergeFrom(info_); send(from, message); return; } @@ -950,7 +956,7 @@ void Master::reregisterFramework( FrameworkReregisteredMessage message; message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); - message.mutable_master_info()->MergeFrom(info); + message.mutable_master_info()->MergeFrom(info_); send(from, message); return; } @@ -2605,7 +2611,7 @@ void Master::addFramework(Framework* framework) FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id); - message.mutable_master_info()->MergeFrom(info); + message.mutable_master_info()->MergeFrom(info_); send(framework->pid, message); allocator->frameworkAdded( @@ -2651,7 +2657,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) { FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id); - message.mutable_master_info()->MergeFrom(info); + message.mutable_master_info()->MergeFrom(info_); send(newPid, message); } @@ -3083,7 +3089,7 @@ FrameworkID Master::newFrameworkId() { std::ostringstream out; - out << info.id() << "-" << std::setw(4) + out << info_.id() << "-" << std::setw(4) << std::setfill('0') << nextFrameworkId++; FrameworkID frameworkId; @@ -3096,7 +3102,7 @@ FrameworkID Master::newFrameworkId() OfferID Master::newOfferId() { OfferID offerId; - offerId.set_value(info.id() + "-" + stringify(nextOfferId++)); + offerId.set_value(info_.id() + "-" + stringify(nextOfferId++)); return offerId; } @@ -3104,7 +3110,7 @@ OfferID Master::newOfferId() SlaveID Master::newSlaveId() { SlaveID slaveId; - slaveId.set_value(info.id() + "-" + stringify(nextSlaveId++)); + slaveId.set_value(info_.id() + "-" + stringify(nextSlaveId++)); return slaveId; } http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 7649737..737bd8b 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -177,12 +177,17 @@ public: // Invoked when there is a newly elected leading master. // Made public for testing purposes. - void detected(const Future<Option<UPID> >& pid); + void detected(const Future<Option<MasterInfo> >& pid); // Invoked when the contender has lost the candidacy. // Made public for testing purposes. void lostCandidacy(const Future<Nothing>& lost); + MasterInfo info() const + { + return info_; + } + protected: virtual void initialize(); virtual void finalize(); @@ -309,10 +314,13 @@ private: const Flags flags; - Option<UPID> leader; // Current leading master. + Option<MasterInfo> leader; // Current leading master. // Whether we are the current leading master. - bool elected() const { return leader.isSome() && leader.get() == self(); } + bool elected() const + { + return leader.isSome() && leader.get() == info_; + } allocator::Allocator* allocator; WhitelistWatcher* whitelistWatcher; @@ -322,7 +330,7 @@ private: MasterContender* contender; MasterDetector* detector; - MasterInfo info; + MasterInfo info_; hashmap<FrameworkID, Framework*> frameworks; @@ -616,7 +624,7 @@ struct Framework } - const FrameworkID id; // TODO(benh): Store this in 'info. + const FrameworkID id; // TODO(benh): Store this in 'info'. const FrameworkInfo info; UPID pid; http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 77588c3..fee1a04 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -168,24 +168,28 @@ protected: &FrameworkErrorMessage::message); // Start detecting masters. - detector->detect(master) + detector->detect() .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1)); } - void detected(const Future<Option<UPID> >& pid) + void detected(const Future<Option<MasterInfo> >& _master) { if (aborted) { VLOG(1) << "Ignoring the master change because the driver is aborted!"; return; } - CHECK(!pid.isDiscarded()); + CHECK(!_master.isDiscarded()); - if (pid.isFailed()) { - EXIT(1) << "Failed to detect a master: " << pid.failure(); + if (_master.isFailed()) { + EXIT(1) << "Failed to detect a master: " << _master.failure(); } - master = pid.get(); + if (_master.get().isSome()) { + master = UPID(_master.get().get().pid()); + } else { + master = None(); + } if (connected) { // There are three cases here: @@ -228,7 +232,7 @@ protected: // Keep detecting masters. LOG(INFO) << "Detecting new master"; - detector->detect(master) + detector->detect(_master.get()) .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 5bb7a0c..213df86 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -482,7 +482,7 @@ Nothing Slave::detachFile(const string& path) } -void Slave::detected(const Future<Option<UPID> >& pid) +void Slave::detected(const Future<Option<MasterInfo> >& _master) { CHECK(state == DISCONNECTED || state == RUNNING || @@ -492,13 +492,17 @@ void Slave::detected(const Future<Option<UPID> >& pid) state = DISCONNECTED; } - CHECK(!pid.isDiscarded()); + CHECK(!_master.isDiscarded()); - if (pid.isFailed()) { - EXIT(1) << "Failed to detect a master: " << pid.failure(); + if (_master.isFailed()) { + EXIT(1) << "Failed to detect a master: " << _master.failure(); } - master = pid.get(); + if (_master.get().isSome()) { + master = UPID(_master.get().get().pid()); + } else { + master = None(); + } if (master.isSome()) { LOG(INFO) << "New master detected at " << master.get(); @@ -527,7 +531,7 @@ void Slave::detected(const Future<Option<UPID> >& pid) // Keep detecting masters. LOG(INFO) << "Detecting new master"; - detector->detect(master) + detector->detect(_master.get()) .onAny(defer(self(), &Slave::detected, lambda::_1)); } @@ -2814,7 +2818,7 @@ void Slave::__recover(const Future<Nothing>& future) } // Start detecting masters. - detector->detect(master) + detector->detect() .onAny(defer(self(), &Slave::detected, lambda::_1)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 891c874..2ddadb4 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -206,7 +206,7 @@ public: // Invoked whenever the detector detects a change in masters. // Made public for testing purposes. - void detected(const Future<Option<UPID> >& pid); + void detected(const Future<Option<MasterInfo> >& pid); enum State { RECOVERING, // Slave is doing recovery. http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/tests/balloon_framework_test.sh ---------------------------------------------------------------------- diff --git a/src/tests/balloon_framework_test.sh b/src/tests/balloon_framework_test.sh index bf92cd8..e7bf4e6 100755 --- a/src/tests/balloon_framework_test.sh +++ b/src/tests/balloon_framework_test.sh @@ -104,7 +104,7 @@ if [[ ${STATUS} -ne 0 ]]; then fi # The main event! -${BALLOON_FRAMEWORK} localhost:5432 1024 +${BALLOON_FRAMEWORK} 127.0.0.1:5432 1024 STATUS=${?} # Make sure the balloon framework "failed". http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/tests/cluster.hpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp index 065976c..1334800 100644 --- a/src/tests/cluster.hpp +++ b/src/tests/cluster.hpp @@ -282,7 +282,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start( if (url.isNone()) { // This means we are using the StandaloneMasterDetector. dynamic_cast<StandaloneMasterDetector*>(master.detector)->appoint( - master.master->self()); + master.master->info()); } process::PID<master::Master> pid = process::spawn(master.master); @@ -342,7 +342,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start( if (url.isNone()) { // This means we are using the StandaloneMasterDetector. dynamic_cast<StandaloneMasterDetector*>(master.detector)->appoint( - master.master->self()); + master.master->info()); } process::PID<master::Master> pid = process::spawn(master.master); http://git-wip-us.apache.org/repos/asf/mesos/blob/26b156e3/src/tests/master_contender_detector_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp index 5223200..b42574b 100644 --- a/src/tests/master_contender_detector_tests.cpp +++ b/src/tests/master_contender_detector_tests.cpp @@ -41,6 +41,8 @@ #include <stout/path.hpp> #include <stout/try.hpp> +#include "common/protobuf_utils.hpp" + #include "master/contender.hpp" #include "master/detector.hpp" #include "master/master.hpp" @@ -80,19 +82,6 @@ using testing::AtMost; using testing::Return; -// Helper function that creates a MasterInfo from PID<Master>. -static MasterInfo createMasterInfo(const PID<Master>& master) -{ - MasterInfo masterInfo; - masterInfo.set_id(UUID::random().toString()); - masterInfo.set_ip(master.ip); - masterInfo.set_port(master.port); - masterInfo.set_pid(master); - - return masterInfo; -} - - class MasterContenderDetectorTest : public MesosTest {}; @@ -146,7 +135,7 @@ TEST(BasicMasterContenderDetectorTest, Contender) MasterContender* contender = new StandaloneMasterContender(); - contender->initialize(createMasterInfo(master)); + contender->initialize(internal::protobuf::createMasterInfo(master)); Future<Future<Nothing> > contended = contender->contend(); AWAIT_READY(contended); @@ -171,7 +160,7 @@ TEST(BasicMasterContenderDetectorTest, Detector) StandaloneMasterDetector detector; - Future<Option<UPID> > detected = detector.detect(); + Future<Option<MasterInfo> > detected = detector.detect(); // No one has appointed the leader so we are pending. EXPECT_TRUE(detected.isPending()); @@ -199,17 +188,18 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender) ZooKeeperMasterContender* contender = new ZooKeeperMasterContender(group); - PID<Master> master; - master.ip = 10000000; - master.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo master = internal::protobuf::createMasterInfo(pid); - contender->initialize(createMasterInfo(master)); + contender->initialize(master); Future<Future<Nothing> > contended = contender->contend(); AWAIT_READY(contended); ZooKeeperMasterDetector detector(url.get()); - Future<Option<UPID> > leader = detector.detect(); + Future<Option<MasterInfo> > leader = detector.detect(); EXPECT_SOME_EQ(master, leader.get()); Future<Nothing> lostCandidacy = contended.get(); leader = detector.detect(leader.get()); @@ -237,11 +227,12 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection) ZooKeeperMasterContender contender(url.get()); - PID<Master> master; - master.ip = 10000000; - master.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo master = internal::protobuf::createMasterInfo(pid); - contender.initialize(createMasterInfo(master)); + contender.initialize(master); // Drop Group::join so that 'contended' will stay pending. Future<Nothing> join = DROP_DISPATCH(_, &GroupProcess::join); @@ -290,34 +281,36 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders) ZooKeeperMasterContender* contender1 = new ZooKeeperMasterContender(url.get()); - PID<Master> master1; - master1.ip = 10000000; - master1.port = 10000; + PID<Master> pid1; + pid1.ip = 10000000; + pid1.port = 10000; + MasterInfo master1 = internal::protobuf::createMasterInfo(pid1); - contender1->initialize(createMasterInfo(master1)); + contender1->initialize(master1); Future<Future<Nothing> > contended1 = contender1->contend(); AWAIT_READY(contended1); ZooKeeperMasterDetector detector1(url.get()); - Future<Option<UPID> > leader1 = detector1.detect(); + Future<Option<MasterInfo> > leader1 = detector1.detect(); AWAIT_READY(leader1); EXPECT_SOME_EQ(master1, leader1.get()); ZooKeeperMasterContender contender2(url.get()); - PID<Master> master2; - master2.ip = 10000001; - master2.port = 10001; + PID<Master> pid2; + pid2.ip = 10000001; + pid2.port = 10001; + MasterInfo master2 = internal::protobuf::createMasterInfo(pid2); - contender2.initialize(createMasterInfo(master2)); + contender2.initialize(master2); Future<Future<Nothing> > contended2 = contender2.contend(); AWAIT_READY(contended2); ZooKeeperMasterDetector detector2(url.get()); - Future<Option<UPID> > leader2 = detector2.detect(); + Future<Option<MasterInfo> > leader2 = detector2.detect(); AWAIT_READY(leader2); EXPECT_SOME_EQ(master1, leader2.get()); @@ -326,7 +319,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders) // Destroying detector1 (below) causes leadership change. delete contender1; - Future<Option<UPID> > leader3 = detector2.detect(master1); + Future<Option<MasterInfo> > leader3 = detector2.detect(master1); AWAIT_READY(leader3); EXPECT_SOME_EQ(master2, leader3.get()); } @@ -345,9 +338,10 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors) zookeeper::Authentication("digest", "member:member")); AWAIT_READY(group1.join("data")); - PID<Master> master; - master.ip = 10000000; - master.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo master = internal::protobuf::createMasterInfo(pid); // group2's password is wrong and operations on it will fail. Owned<zookeeper::Group> group2(new Group( @@ -356,7 +350,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors) "/mesos", zookeeper::Authentication("digest", "member:wrongpass"))); ZooKeeperMasterContender contender(group2); - contender.initialize(createMasterInfo(master)); + contender.initialize(master); // Fails due to authentication error. AWAIT_FAILED(contender.contend()); @@ -408,11 +402,12 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork) ZooKeeperMasterContender contender(url.get()); - PID<Master> master; - master.ip = 10000000; - master.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo master = internal::protobuf::createMasterInfo(pid); - contender.initialize(createMasterInfo(master)); + contender.initialize(master); Future<Future<Nothing> > contended = contender.contend(); AWAIT_READY(contended); @@ -420,7 +415,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork) ZooKeeperMasterDetector detector(url.get()); - Future<Option<UPID> > leader = detector.detect(); + Future<Option<MasterInfo> > leader = detector.detect(); AWAIT_READY(leader); EXPECT_SOME_EQ(master, leader.get()); @@ -483,11 +478,12 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession) // 1. Simulate a leading contender. ZooKeeperMasterContender leaderContender(leaderGroup); - PID<Master> leader; - leader.ip = 10000000; - leader.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo leader = internal::protobuf::createMasterInfo(pid); - leaderContender.initialize(createMasterInfo(leader)); + leaderContender.initialize(leader); Future<Future<Nothing> > contended = leaderContender.contend(); AWAIT_READY(contended); @@ -495,7 +491,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession) ZooKeeperMasterDetector leaderDetector(leaderGroup); - Future<Option<UPID> > detected = leaderDetector.detect(); + Future<Option<MasterInfo> > detected = leaderDetector.detect(); AWAIT_READY(detected); EXPECT_SOME_EQ(leader, detected.get()); @@ -503,11 +499,12 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession) Owned<zookeeper::Group> followerGroup(new Group(url.get(), sessionTimeout)); ZooKeeperMasterContender followerContender(followerGroup); - PID<Master> follower; - follower.ip = 10000001; - follower.port = 10001; + PID<Master> pid2; + pid2.ip = 10000001; + pid2.port = 10001; + MasterInfo follower = internal::protobuf::createMasterInfo(pid2); - followerContender.initialize(createMasterInfo(follower)); + followerContender.initialize(follower); contended = followerContender.contend(); AWAIT_READY(contended); @@ -548,9 +545,11 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession) AWAIT_READY(nonContenderReconnecting); // Now the detectors re-detect. - Future<Option<UPID> > leaderDetected = leaderDetector.detect(leader); - Future<Option<UPID> > followerDetected = followerDetector.detect(leader); - Future<Option<UPID> > nonContenderDetected = + Future<Option<MasterInfo> > leaderDetected = + leaderDetector.detect(leader); + Future<Option<MasterInfo> > followerDetected = + followerDetector.detect(leader); + Future<Option<MasterInfo> > nonContenderDetected = nonContenderDetector.detect(leader); Clock::pause(); @@ -589,9 +588,10 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ASSERT_SOME(url); - PID<Master> leader; - leader.ip = 10000000; - leader.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo leader = internal::protobuf::createMasterInfo(pid); // Create the group instance so we can expire its session. Owned<zookeeper::Group> group( @@ -599,7 +599,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ZooKeeperMasterContender leaderContender(group); - leaderContender.initialize(createMasterInfo(leader)); + leaderContender.initialize(leader); Future<Future<Nothing> > leaderContended = leaderContender.contend(); AWAIT_READY(leaderContended); @@ -608,22 +608,23 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ZooKeeperMasterDetector leaderDetector(url.get()); - Future<Option<UPID> > detected = leaderDetector.detect(); + Future<Option<MasterInfo> > detected = leaderDetector.detect(); AWAIT_READY(detected); EXPECT_SOME_EQ(leader, detected.get()); // Keep detecting. - Future<Option<UPID> > newLeaderDetected = + Future<Option<MasterInfo> > newLeaderDetected = leaderDetector.detect(detected.get()); // Simulate a following master. - PID<Master> follower; - follower.ip = 10000001; - follower.port = 10001; + PID<Master> pid2; + pid2.ip = 10000001; + pid2.port = 10001; + MasterInfo follower = internal::protobuf::createMasterInfo(pid2); ZooKeeperMasterDetector followerDetector(url.get()); ZooKeeperMasterContender followerContender(url.get()); - followerContender.initialize(createMasterInfo(follower)); + followerContender.initialize(follower); Future<Future<Nothing> > followerContended = followerContender.contend(); AWAIT_READY(followerContended); @@ -663,12 +664,13 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession) ASSERT_SOME(url); - PID<Master> master; - master.ip = 10000000; - master.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo master = internal::protobuf::createMasterInfo(pid); ZooKeeperMasterContender masterContender(url.get()); - masterContender.initialize(createMasterInfo(master)); + masterContender.initialize(master); Future<Future<Nothing> > leaderContended = masterContender.contend(); AWAIT_READY(leaderContended); @@ -679,7 +681,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession) ZooKeeperMasterDetector slaveDetector(group); - Future<Option<UPID> > detected = slaveDetector.detect(); + Future<Option<MasterInfo> > detected = slaveDetector.detect(); AWAIT_READY(detected); EXPECT_SOME_EQ(master, detected.get()); @@ -723,16 +725,17 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ZooKeeperMasterContender leaderContender(leaderGroup); ZooKeeperMasterDetector leaderDetector(leaderGroup); - PID<Master> leader; - leader.ip = 10000000; - leader.port = 10000; + PID<Master> pid; + pid.ip = 10000000; + pid.port = 10000; + MasterInfo leader = internal::protobuf::createMasterInfo(pid); - leaderContender.initialize(createMasterInfo(leader)); + leaderContender.initialize(leader); Future<Future<Nothing> > contended = leaderContender.contend(); AWAIT_READY(contended); - Future<Option<UPID> > detected = leaderDetector.detect(None()); + Future<Option<MasterInfo> > detected = leaderDetector.detect(None()); AWAIT_READY(detected); EXPECT_SOME_EQ(leader, detected.get()); @@ -742,11 +745,12 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ZooKeeperMasterContender followerContender(followerGroup); ZooKeeperMasterDetector followerDetector(followerGroup); - PID<Master> follower; - follower.ip = 10000001; - follower.port = 10001; + PID<Master> pid2; + pid2.ip = 10000001; + pid2.port = 10001; + MasterInfo follower = internal::protobuf::createMasterInfo(pid2); - followerContender.initialize(createMasterInfo(follower)); + followerContender.initialize(follower); contended = followerContender.contend(); AWAIT_READY(contended);