Added a storage local resource provider test for CSI plugin restart. The test does the same as the `PublishResources` test, but it kills the CSI plugin container between each operation.
Review: https://reviews.apache.org/r/64998/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69e5e28e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69e5e28e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69e5e28e Branch: refs/heads/master Commit: 69e5e28ed0756f94c839a453052d268696d66a33 Parents: 0e4d6f2 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Fri Jan 19 15:36:27 2018 -0800 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Fri Jan 19 15:50:08 2018 -0800 ---------------------------------------------------------------------- src/Makefile.am | 1 + src/slave/container_daemon.cpp | 47 +-- src/slave/container_daemon_process.hpp | 82 ++++++ .../storage_local_resource_provider_tests.cpp | 290 +++++++++++++++++++ 4 files changed, 375 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 191594b..fe8f689 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1190,6 +1190,7 @@ libmesos_no_3rdparty_la_SOURCES += \ slave/compatibility.hpp \ slave/constants.hpp \ slave/container_daemon.hpp \ + slave/container_daemon_process.hpp \ slave/flags.hpp \ slave/gc.hpp \ slave/gc_process.hpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon.cpp ---------------------------------------------------------------------- diff --git a/src/slave/container_daemon.cpp b/src/slave/container_daemon.cpp index d74fa51..6458d1f 100644 --- a/src/slave/container_daemon.cpp +++ b/src/slave/container_daemon.cpp @@ -16,20 +16,17 @@ #include "slave/container_daemon.hpp" -#include <mesos/agent/agent.hpp> - #include <process/defer.hpp> #include <process/id.hpp> -#include <process/process.hpp> #include <stout/lambda.hpp> #include <stout/stringify.hpp> #include <stout/unreachable.hpp> -#include "common/http.hpp" - #include "internal/evolve.hpp" +#include "slave/container_daemon_process.hpp" + namespace http = process::http; using std::string; @@ -64,46 +61,6 @@ static inline http::Headers getAuthHeader(const Option<string>& authToken) } -class ContainerDaemonProcess : public Process<ContainerDaemonProcess> -{ -public: - explicit ContainerDaemonProcess( - const http::URL& _agentUrl, - const Option<string>& _authToken, - const ContainerID& containerId, - const Option<CommandInfo>& commandInfo, - const Option<Resources>& resources, - const Option<ContainerInfo>& containerInfo, - const Option<std::function<Future<Nothing>()>>& _postStartHook, - const Option<std::function<Future<Nothing>()>>& _postStopHook); - - ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete; - - ContainerDaemonProcess& operator=( - const ContainerDaemonProcess& other) = delete; - - Future<Nothing> wait(); - -protected: - void initialize() override; - -private: - void launchContainer(); - void waitContainer(); - - const http::URL agentUrl; - const Option<string> authToken; - const ContentType contentType; - const Option<std::function<Future<Nothing>()>> postStartHook; - const Option<std::function<Future<Nothing>()>> postStopHook; - - Call launchCall; - Call waitCall; - - Promise<Nothing> terminated; -}; - - ContainerDaemonProcess::ContainerDaemonProcess( const http::URL& _agentUrl, const Option<string>& _authToken, http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon_process.hpp ---------------------------------------------------------------------- diff --git a/src/slave/container_daemon_process.hpp b/src/slave/container_daemon_process.hpp new file mode 100644 index 0000000..a5d19a0 --- /dev/null +++ b/src/slave/container_daemon_process.hpp @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__ +#define __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__ + +#include <functional> +#include <string> + +#include <mesos/mesos.hpp> +#include <mesos/agent/agent.hpp> + +#include <process/future.hpp> +#include <process/http.hpp> +#include <process/process.hpp> + +#include <stout/option.hpp> + +#include "common/http.hpp" + +namespace mesos { +namespace internal { +namespace slave { + +class ContainerDaemonProcess : public process::Process<ContainerDaemonProcess> +{ +public: + explicit ContainerDaemonProcess( + const process::http::URL& _agentUrl, + const Option<std::string>& _authToken, + const ContainerID& containerId, + const Option<CommandInfo>& commandInfo, + const Option<Resources>& resources, + const Option<ContainerInfo>& containerInfo, + const Option<std::function<process::Future<Nothing>()>>& _postStartHook, + const Option<std::function<process::Future<Nothing>()>>& _postStopHook); + + ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete; + + ContainerDaemonProcess& operator=( + const ContainerDaemonProcess& other) = delete; + + process::Future<Nothing> wait(); + + // Made public for testing purpose. + void launchContainer(); + void waitContainer(); + +protected: + void initialize() override; + +private: + const process::http::URL agentUrl; + const Option<std::string> authToken; + const ContentType contentType; + const Option<std::function<process::Future<Nothing>()>> postStartHook; + const Option<std::function<process::Future<Nothing>()>> postStopHook; + + agent::Call launchCall; + agent::Call waitCall; + + process::Promise<Nothing> terminated; +}; + +} // namespace slave { +} // namespace internal { +} // namespace mesos { + +#endif // __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/tests/storage_local_resource_provider_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index dfe4faf..1b21527 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -24,6 +24,12 @@ #include "module/manager.hpp" +#include "slave/container_daemon_process.hpp" + +#include "slave/containerizer/fetcher.hpp" + +#include "slave/containerizer/mesos/containerizer.hpp" + #include "tests/flags.hpp" #include "tests/mesos.hpp" @@ -31,6 +37,8 @@ using std::shared_ptr; using std::string; using std::vector; +using mesos::internal::slave::ContainerDaemonProcess; + using mesos::master::detector::MasterDetector; using mesos::v1::resource_provider::Call; @@ -1449,6 +1457,288 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery) // This test verifies that the storage local resource provider can +// restart its CSI plugin after it is killed and continue to work +// properly. +TEST_F( + StorageLocalResourceProviderTest, + ROOT_PublishUnpublishResourcesPluginKilled) +{ + loadUriDiskProfileModule(); + + setupResourceProviderConfig(Gigabytes(4)); + setupDiskProfileConfig(); + + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.allocation_interval = Milliseconds(50); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.isolation = "filesystem/linux"; + + // Disable HTTP authentication to simplify resource provider interactions. + slaveFlags.authenticate_http_readwrite = false; + + // Set the resource provider capability. + vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES(); + SlaveInfo::Capability capability; + capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER); + capabilities.push_back(capability); + + slaveFlags.agent_features = SlaveCapabilities(); + slaveFlags.agent_features->mutable_capabilities()->CopyFrom( + {capabilities.begin(), capabilities.end()}); + + slaveFlags.resource_provider_config_dir = resourceProviderConfigDir; + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + slave::Fetcher fetcher(slaveFlags); + + Try<slave::MesosContainerizer*> _containerizer = + slave::MesosContainerizer::create(slaveFlags, false, &fetcher); + ASSERT_SOME(_containerizer); + + Owned<slave::MesosContainerizer> containerizer(_containerizer.get()); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), containerizer.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise operations. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + // We use the following filter so that the resources will not be + // filtered for 5 seconds (the default). + Filters acceptFilters; + acceptFilters.set_refuse_seconds(0); + + // We use the following filter to filter offers that do not have + // wanted resources for 365 days (the maximum). + Filters declineFilters; + declineFilters.set_refuse_seconds(Days(365).secs()); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // The framework is expected to see the following offers in sequence: + // 1. One containing a RAW disk resource before `CREATE_VOLUME`. + // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`. + // 3. One containing the same MOUNT disk resource after `CREATE`, + // `LAUNCH` and `DESTROY`. + // 4. One containing the same RAW disk resource after `DESTROY_VOLUME`. + // + // We set up the expectations for these offers as the test progresses. + Future<vector<Offer>> rawDiskOffers; + Future<vector<Offer>> volumeCreatedOffers; + Future<vector<Offer>> taskFinishedOffers; + Future<vector<Offer>> volumeDestroyedOffers; + + Sequence offers; + + // We are only interested in storage pools and volume created from + // them, which have a "volume-default" profile. + auto hasSourceType = []( + const Resource& r, + const Resource::DiskInfo::Source::Type& type) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().has_profile() && + r.disk().source().profile() == "volume-default" && + r.disk().source().type() == type; + }; + + // Decline offers that contain only the agent's default resources. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers(declineFilters)); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&rawDiskOffers)); + + driver.start(); + + AWAIT_READY(rawDiskOffers); + ASSERT_FALSE(rawDiskOffers->empty()); + + Option<Resource> source; + + foreach (const Resource& resource, rawDiskOffers->at(0).resources()) { + if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { + source = resource; + break; + } + } + + ASSERT_SOME(source); + + // Get the ID of the CSI plugin container. + Future<hashset<ContainerID>> pluginContainers = containerizer->containers(); + + AWAIT_READY(pluginContainers); + ASSERT_EQ(1u, pluginContainers->size()); + + const ContainerID& pluginContainerId = *pluginContainers->begin(); + + Future<Nothing> pluginRestarted = + FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer); + + // Kill the plugin container and wait for it to restart. + Future<int> pluginKilled = containerizer->status(pluginContainerId) + .then([](const ContainerStatus& status) { + return os::kill(status.executor_pid(), SIGKILL); + }); + + AWAIT_ASSERT_EQ(0, pluginKilled); + AWAIT_READY(pluginRestarted); + + // Create a volume. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&volumeCreatedOffers)); + + driver.acceptOffers( + {rawDiskOffers->at(0).id()}, + {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)}, + acceptFilters); + + AWAIT_READY(volumeCreatedOffers); + ASSERT_FALSE(volumeCreatedOffers->empty()); + + Option<Resource> volume; + + foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) { + if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) { + volume = resource; + break; + } + } + + ASSERT_SOME(volume); + ASSERT_TRUE(volume->disk().source().has_id()); + ASSERT_TRUE(volume->disk().source().has_metadata()); + ASSERT_TRUE(volume->disk().source().has_mount()); + ASSERT_TRUE(volume->disk().source().mount().has_root()); + EXPECT_FALSE(path::absolute(volume->disk().source().mount().root())); + + // Check if the volume is actually created by the test CSI plugin. + Option<string> volumePath; + + foreach (const Label& label, volume->disk().source().metadata().labels()) { + if (label.key() == "path") { + volumePath = label.value(); + break; + } + } + + ASSERT_SOME(volumePath); + EXPECT_TRUE(os::exists(volumePath.get())); + + pluginRestarted = + FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer); + + // Kill the plugin container and wait for it to restart. + pluginKilled = containerizer->status(pluginContainerId) + .then([](const ContainerStatus& status) { + return os::kill(status.executor_pid(), SIGKILL); + }); + + AWAIT_ASSERT_EQ(0, pluginKilled); + AWAIT_READY(pluginRestarted); + + // Put a file into the volume. + ASSERT_SOME(os::touch(path::join(volumePath.get(), "file"))); + + // Create a persistent volume on the CSI volume, then launch a task to + // use the persistent volume. + Resource persistentVolume = volume.get(); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + persistentVolume.mutable_disk()->mutable_volume() + ->set_container_path("volume"); + persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); + + Future<TaskStatus> taskStarting; + Future<TaskStatus> taskRunning; + Future<TaskStatus> taskFinished; + + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&taskStarting)) + .WillOnce(FutureArg<1>(&taskRunning)) + .WillOnce(FutureArg<1>(&taskFinished)); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( + persistentVolume))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&taskFinishedOffers)); + + driver.acceptOffers( + {volumeCreatedOffers->at(0).id()}, + {CREATE(persistentVolume), + LAUNCH({createTask( + volumeCreatedOffers->at(0).slave_id(), + persistentVolume, + createCommandInfo("test -f " + path::join("volume", "file")))})}, + acceptFilters); + + AWAIT_READY(taskStarting); + EXPECT_EQ(TASK_STARTING, taskStarting->state()); + + AWAIT_READY(taskRunning); + EXPECT_EQ(TASK_RUNNING, taskRunning->state()); + + AWAIT_READY(taskFinished); + EXPECT_EQ(TASK_FINISHED, taskFinished->state()); + + AWAIT_READY(taskFinishedOffers); + + pluginRestarted = + FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer); + + // Kill the plugin container and wait for it to restart. + pluginKilled = containerizer->status(pluginContainerId) + .then([](const ContainerStatus& status) { + return os::kill(status.executor_pid(), SIGKILL); + }); + + AWAIT_ASSERT_EQ(0, pluginKilled); + AWAIT_READY(pluginRestarted); + + // Destroy the persistent volume and the CSI volume. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get()))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&volumeDestroyedOffers)); + + driver.acceptOffers( + {taskFinishedOffers->at(0).id()}, + {DESTROY(persistentVolume), + DESTROY_VOLUME(volume.get())}, + acceptFilters); + + AWAIT_READY(volumeDestroyedOffers); + ASSERT_FALSE(volumeDestroyedOffers->empty()); + + // Check if the volume is actually deleted by the test CSI plugin. + EXPECT_FALSE(os::exists(volumePath.get())); +} + + +// This test verifies that the storage local resource provider can // convert pre-existing CSI volumes into mount or block volumes. TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume) {