This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5ed30db48785007e35805886a024ebb8a61a7037
Author: Greg Mann <g...@mesosphere.io>
AuthorDate: Thu Aug 20 19:27:02 2020 -0700

    Added the CSI server to the Mesos agent.
    
    This patch adds a CSI server to the Mesos agent in both
    the agent binary and in tests.
    
    Review: https://reviews.apache.org/r/72761/
---
 src/local/local.cpp      |   1 +
 src/slave/main.cpp       | 101 ++++++++++++++++++++++++++-----------
 src/slave/slave.cpp      |  18 +++++++
 src/slave/slave.hpp      |   3 ++
 src/tests/cluster.cpp    | 128 ++++++++++++++++++++++++++++++++++-------------
 src/tests/cluster.hpp    |   3 ++
 src/tests/mesos.cpp      |   1 +
 src/tests/mesos.hpp      |   9 ++++
 src/tests/mock_slave.cpp |   7 +++
 src/tests/mock_slave.hpp |   3 ++
 10 files changed, 208 insertions(+), 66 deletions(-)

diff --git a/src/local/local.cpp b/src/local/local.cpp
index 8950570..9535399 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -535,6 +535,7 @@ PID<Master> launch(const Flags& flags, Allocator* 
_allocator)
         secretGenerators->back(),
         nullptr,
         nullptr,
+        nullptr,
 #ifndef __WINDOWS__
         None(),
 #endif // __WINDOWS__
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 0aa2cc9..84b813c 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -37,6 +37,8 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 
+#include <process/ssl/flags.hpp>
+
 #include <stout/check.hpp>
 #include <stout/flags.hpp>
 #include <stout/hashset.hpp>
@@ -84,6 +86,7 @@
 #include "module/manager.hpp"
 
 #include "slave/constants.hpp"
+#include "slave/csi_server.hpp"
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
 #include "slave/task_status_update_manager.hpp"
@@ -111,6 +114,8 @@ using mesos::Authorizer;
 using mesos::SecretResolver;
 using mesos::SlaveInfo;
 
+using net::IP;
+
 using process::Owned;
 
 using process::firewall::DisabledEndpointsFirewallRule;
@@ -528,6 +533,69 @@ int main(int argc, char** argv)
                        << futureTracker.error();
   }
 
+  SecretGenerator* secretGenerator = nullptr;
+
+#ifdef USE_SSL_SOCKET
+  if (flags.jwt_secret_key.isSome()) {
+    Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get());
+    if (jwtSecretKey.isError()) {
+      EXIT(EXIT_FAILURE) << "Failed to read the file specified by "
+                         << "--jwt_secret_key";
+    }
+
+    // TODO(greggomann): Factor the following code out into a common helper,
+    // since we also do this when loading credentials.
+    Try<os::Permissions> permissions =
+      os::permissions(flags.jwt_secret_key.get());
+    if (permissions.isError()) {
+      LOG(WARNING) << "Failed to stat jwt secret key file '"
+                   << flags.jwt_secret_key.get()
+                   << "': " << permissions.error();
+    } else if (permissions->others.rwx) {
+      LOG(WARNING) << "Permissions on executor secret key file '"
+                   << flags.jwt_secret_key.get()
+                   << "' are too open; it is recommended that your"
+                   << " key file is NOT accessible by others";
+    }
+
+    secretGenerator = new JWTSecretGenerator(jwtSecretKey.get());
+  }
+#endif // USE_SSL_SOCKET
+
+  // The agent will hold ownership of the CSI server, but we also pass a 
pointer
+  // to it into the containerizer for use by the 'volume/csi' isolator.
+  Owned<CSIServer> csiServer;
+
+  if (flags.csi_plugin_config_dir.isSome()) {
+    // Initialize the CSI server, which manages any configured CSI plugins.
+    string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+    if (process::network::openssl::flags().enabled) {
+      scheme = "https";
+    }
+#endif
+
+    const process::http::URL agentUrl(
+        scheme,
+        process::address().ip,
+        process::address().port,
+        id + "/api/v1");
+
+    Try<Owned<CSIServer>> csiServer_ = CSIServer::create(
+        flags,
+        agentUrl,
+        secretGenerator,
+        secretResolver.get());
+
+    if (csiServer_.isError()) {
+      EXIT(EXIT_FAILURE)
+        << "Failed to initialize the CSI server: " << csiServer_.error();
+    }
+
+    csiServer = std::move(csiServer_.get());
+  }
+
   Try<Containerizer*> containerizer = Containerizer::create(
       flags,
       false,
@@ -535,7 +603,8 @@ int main(int argc, char** argv)
       gc,
       secretResolver.get(),
       volumeGidManager,
-      futureTracker.get());
+      futureTracker.get(),
+      csiServer.get());
 
   if (containerizer.isError()) {
     EXIT(EXIT_FAILURE)
@@ -608,35 +677,6 @@ int main(int argc, char** argv)
                        << qosController.error();
   }
 
-  SecretGenerator* secretGenerator = nullptr;
-
-#ifdef USE_SSL_SOCKET
-  if (flags.jwt_secret_key.isSome()) {
-    Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get());
-    if (jwtSecretKey.isError()) {
-      EXIT(EXIT_FAILURE) << "Failed to read the file specified by "
-                         << "--jwt_secret_key";
-    }
-
-    // TODO(greggomann): Factor the following code out into a common helper,
-    // since we also do this when loading credentials.
-    Try<os::Permissions> permissions =
-      os::permissions(flags.jwt_secret_key.get());
-    if (permissions.isError()) {
-      LOG(WARNING) << "Failed to stat jwt secret key file '"
-                   << flags.jwt_secret_key.get()
-                   << "': " << permissions.error();
-    } else if (permissions->others.rwx) {
-      LOG(WARNING) << "Permissions on executor secret key file '"
-                   << flags.jwt_secret_key.get()
-                   << "' are too open; it is recommended that your"
-                   << " key file is NOT accessible by others";
-    }
-
-    secretGenerator = new JWTSecretGenerator(jwtSecretKey.get());
-  }
-#endif // USE_SSL_SOCKET
-
 #ifndef __WINDOWS__
   // Create executor domain socket if the user so desires.
   Option<Socket> executorSocket = None();
@@ -723,6 +763,7 @@ int main(int argc, char** argv)
       secretGenerator,
       volumeGidManager,
       futureTracker.get(),
+      std::move(csiServer),
 #ifndef __WINDOWS__
       executorSocket,
 #endif // __WINDOWS__
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c828d99..a69937b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -205,6 +205,7 @@ Slave::Slave(const string& id,
              SecretGenerator* _secretGenerator,
              VolumeGidManager* _volumeGidManager,
              PendingFutureTracker* _futureTracker,
+             Owned<CSIServer>&& _csiServer,
 #ifndef __WINDOWS__
              const Option<process::network::unix::Socket>& _executorSocket,
 #endif // __WINDOWS__
@@ -239,6 +240,7 @@ Slave::Slave(const string& id,
     secretGenerator(_secretGenerator),
     volumeGidManager(_volumeGidManager),
     futureTracker(_futureTracker),
+    csiServer(std::move(_csiServer)),
 #ifndef __WINDOWS__
     executorSocket(_executorSocket),
 #endif // __WINDOWS__
@@ -1741,6 +1743,14 @@ void Slave::registered(
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
 
+      if (csiServer.get()) {
+        csiServer->start(info.id())
+          .onFailed([=](const string& failure) {
+            EXIT(EXIT_FAILURE)
+              << "CSI server initialization failed: " << failure;
+          });
+      }
+
       // Setup a timer so that the agent attempts to reregister if it
       // doesn't receive a ping from the master for an extended period
       // of time. This needs to be done once registered, in case we
@@ -1826,6 +1836,14 @@ void Slave::reregistered(
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
 
+      if (csiServer.get()) {
+        csiServer->start(info.id())
+          .onFailed([=](const string& failure) {
+            EXIT(EXIT_FAILURE)
+              << "CSI server initialization failed: " << failure;
+          });
+      }
+
       // Setup a timer so that the agent attempts to reregister if it
       // doesn't receive a ping from the master for an extended period
       // of time. This needs to be done once reregistered, in case we
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2cf45c6..7946668 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -130,6 +130,7 @@ public:
         mesos::SecretGenerator* secretGenerator,
         VolumeGidManager* volumeGidManager,
         PendingFutureTracker* futureTracker,
+        process::Owned<CSIServer>&& csiServer,
 #ifndef __WINDOWS__
         const Option<process::network::unix::Socket>& executorSocket,
 #endif // __WINDOWS__
@@ -888,6 +889,8 @@ private:
 
   PendingFutureTracker* futureTracker;
 
+  process::Owned<CSIServer> csiServer;
+
 #ifndef __WINDOWS__
   Option<process::network::unix::Socket> executorSocket;
 #endif // __WINDOWS__
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index ab4cea4..3c86855 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -16,6 +16,7 @@
 
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <mesos/mesos.hpp>
@@ -48,12 +49,15 @@
 #include <process/pid.hpp>
 #include <process/process.hpp>
 
+#include <process/ssl/flags.hpp>
+
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/gtest.hpp>
 #include <stout/none.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
+#include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
@@ -94,6 +98,7 @@
 #include "master/detector/standalone.hpp"
 #include "master/detector/zookeeper.hpp"
 
+#include "slave/csi_server.hpp"
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
@@ -124,6 +129,10 @@ using mesos::master::detector::ZooKeeperMasterDetector;
 
 using mesos::slave::ContainerTermination;
 
+using net::IP;
+
+using process::Owned;
+
 #ifndef __WINDOWS__
 using process::network::unix::Socket;
 #endif // __WINDOWS__
@@ -426,6 +435,7 @@ Try<process::Owned<Slave>> Slave::create(
     const Option<mesos::SecretGenerator*>& secretGenerator,
     const Option<Authorizer*>& providedAuthorizer,
     const Option<PendingFutureTracker*>& futureTracker,
+    const Option<Owned<slave::CSIServer>>& csiServer,
     bool mock)
 {
   process::Owned<Slave> slave(new Slave());
@@ -468,6 +478,82 @@ Try<process::Owned<Slave>> Slave::create(
     slave->futureTracker.reset(_futureTracker.get());
   }
 
+  // If the secret generator is not provided, create a default one.
+  if (secretGenerator.isNone()) {
+    SecretGenerator* _secretGenerator = nullptr;
+
+#ifdef USE_SSL_SOCKET
+    if (flags.jwt_secret_key.isSome()) {
+      Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get());
+      if (jwtSecretKey.isError()) {
+        return Error("Failed to read the file specified by --jwt_secret_key");
+      }
+
+      // TODO(greggomann): Factor the following code out into a common helper,
+      // since we also do this when loading credentials.
+      Try<os::Permissions> permissions =
+        os::permissions(flags.jwt_secret_key.get());
+      if (permissions.isError()) {
+        LOG(WARNING) << "Failed to stat jwt secret key file '"
+                     << flags.jwt_secret_key.get()
+                     << "': " << permissions.error();
+      } else if (permissions->others.rwx) {
+        LOG(WARNING) << "Permissions on executor secret key file '"
+                     << flags.jwt_secret_key.get()
+                     << "' are too open; it is recommended that your"
+                     << " key file is NOT accessible by others";
+      }
+
+      _secretGenerator = new JWTSecretGenerator(jwtSecretKey.get());
+    }
+#endif // USE_SSL_SOCKET
+
+    slave->secretGenerator.reset(_secretGenerator);
+  }
+
+  // Create a SecretResolver for use with the CSI server below.
+  Try<SecretResolver*> secretResolver =
+    mesos::SecretResolver::create(flags.secret_resolver);
+
+  if (secretResolver.isError()) {
+    return Error(
+        "Failed to initialize secret resolver: " +
+        secretResolver.error());
+  }
+
+  const string processId =
+    id.isSome() ? id.get() : process::ID::generate("slave");
+
+  if (csiServer.isNone() && flags.csi_plugin_config_dir.isSome()) {
+    // Initialize the CSI server, which manages any configured CSI plugins.
+    string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+    if (process::network::openssl::flags().enabled) {
+      scheme = "https";
+    }
+#endif
+
+    const process::http::URL agentUrl(
+        scheme,
+        process::address().ip,
+        flags.port,
+        processId + "/api/v1");
+
+    Try<Owned<slave::CSIServer>> _csiServer = slave::CSIServer::create(
+        flags,
+        agentUrl,
+        secretGenerator.getOrElse(slave->secretGenerator.get()),
+        secretResolver.get());
+
+    if (_csiServer.isError()) {
+      return Error(
+          "Failed to initialize the CSI server: " + _csiServer.error());
+    }
+
+    slave->csiServer = std::move(_csiServer.get());
+  }
+
   // If the containerizer is not provided, create a default one.
   if (containerizer.isSome()) {
     slave->containerizer = containerizer.get();
@@ -483,7 +569,8 @@ Try<process::Owned<Slave>> Slave::create(
           gc.getOrElse(slave->gc.get()),
           nullptr,
           volumeGidManager,
-          futureTracker.getOrElse(slave->futureTracker.get()));
+          futureTracker.getOrElse(slave->futureTracker.get()),
+          (csiServer.getOrElse(slave->csiServer)).get());
 
     if (_containerizer.isError()) {
       return Error("Failed to create containerizer: " + 
_containerizer.error());
@@ -583,39 +670,6 @@ Try<process::Owned<Slave>> Slave::create(
     slave->qosController.reset(_qosController.get());
   }
 
-  // If the secret generator is not provided, create a default one.
-  if (secretGenerator.isNone()) {
-    SecretGenerator* _secretGenerator = nullptr;
-
-#ifdef USE_SSL_SOCKET
-    if (flags.jwt_secret_key.isSome()) {
-      Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get());
-      if (jwtSecretKey.isError()) {
-        return Error("Failed to read the file specified by --jwt_secret_key");
-      }
-
-      // TODO(greggomann): Factor the following code out into a common helper,
-      // since we also do this when loading credentials.
-      Try<os::Permissions> permissions =
-        os::permissions(flags.jwt_secret_key.get());
-      if (permissions.isError()) {
-        LOG(WARNING) << "Failed to stat jwt secret key file '"
-                     << flags.jwt_secret_key.get()
-                     << "': " << permissions.error();
-      } else if (permissions->others.rwx) {
-        LOG(WARNING) << "Permissions on executor secret key file '"
-                     << flags.jwt_secret_key.get()
-                     << "' are too open; it is recommended that your"
-                     << " key file is NOT accessible by others";
-      }
-
-      _secretGenerator = new JWTSecretGenerator(jwtSecretKey.get());
-    }
-#endif // USE_SSL_SOCKET
-
-    slave->secretGenerator.reset(_secretGenerator);
-  }
-
 #ifndef __WINDOWS__
   Option<Socket> executorSocket = None();
   if (flags.http_executor_domain_sockets) {
@@ -645,7 +699,7 @@ Try<process::Owned<Slave>> Slave::create(
   // Inject all the dependencies.
   if (mock) {
     slave->slave.reset(new MockSlave(
-        id.isSome() ? id.get() : process::ID::generate("slave"),
+        processId,
         flags,
         detector,
         slave->containerizer,
@@ -657,10 +711,11 @@ Try<process::Owned<Slave>> Slave::create(
         secretGenerator.getOrElse(slave->secretGenerator.get()),
         volumeGidManager,
         futureTracker.getOrElse(slave->futureTracker.get()),
+        csiServer.getOrElse(slave->csiServer),
         authorizer));
   } else {
     slave->slave.reset(new slave::Slave(
-        id.isSome() ? id.get() : process::ID::generate("slave"),
+        processId,
         flags,
         detector,
         slave->containerizer,
@@ -672,6 +727,7 @@ Try<process::Owned<Slave>> Slave::create(
         secretGenerator.getOrElse(slave->secretGenerator.get()),
         volumeGidManager,
         futureTracker.getOrElse(slave->futureTracker.get()),
+        csiServer.getOrElse(slave->csiServer),
 #ifndef __WINDOWS__
         executorSocket,
 #endif // __WINDOWS__
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 415a60f..b36b94f 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -57,6 +57,7 @@
 #include "master/master.hpp"
 
 #include "slave/constants.hpp"
+#include "slave/csi_server.hpp"
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
@@ -171,6 +172,7 @@ public:
       const Option<mesos::SecretGenerator*>& secretGenerator = None(),
       const Option<Authorizer*>& authorizer = None(),
       const Option<PendingFutureTracker*>& futureTracker = None(),
+      const Option<process::Owned<slave::CSIServer>>& csiServer = None(),
       bool mock = false);
 
   ~Slave();
@@ -241,6 +243,7 @@ private:
   process::Owned<mesos::slave::ResourceEstimator> resourceEstimator;
   process::Owned<mesos::SecretGenerator> secretGenerator;
   process::Owned<slave::TaskStatusUpdateManager> taskStatusUpdateManager;
+  process::Owned<slave::CSIServer> csiServer;
 
   // Indicates whether or not authorization callbacks were set when this agent
   // was constructed.
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index d6933a6..31b1629 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -348,6 +348,7 @@ Try<Owned<cluster::Slave>> MesosTest::StartSlave(const 
SlaveOptions& options)
       options.secretGenerator,
       options.authorizer,
       options.futureTracker,
+      options.csiServer,
       options.mock);
 
   if (slave.isSome() && !options.mock) {
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 0ad0999..8f89d7c 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <gmock/gmock.h>
@@ -82,6 +83,7 @@
 #include "resource_provider/detector.hpp"
 
 #include "slave/constants.hpp"
+#include "slave/csi_server.hpp"
 #include "slave/slave.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
@@ -193,6 +195,12 @@ struct SlaveOptions
     return *this;
   }
 
+  SlaveOptions& withCsiServer(const process::Owned<slave::CSIServer>& 
csiServer)
+  {
+    this->csiServer = csiServer;
+    return *this;
+  }
+
   mesos::master::detector::MasterDetector* detector;
   bool mock;
   Option<slave::Flags> flags;
@@ -205,6 +213,7 @@ struct SlaveOptions
   Option<mesos::SecretGenerator*> secretGenerator;
   Option<Authorizer*> authorizer;
   Option<PendingFutureTracker*> futureTracker;
+  Option<process::Owned<slave::CSIServer>> csiServer;
 };
 
 
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index fa2a0f5..1d03b3c 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -14,6 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <utility>
+
 #include <gmock/gmock.h>
 
 #include <mesos/authentication/secret_generator.hpp>
@@ -22,10 +24,12 @@
 #include <mesos/slave/resource_estimator.hpp>
 
 #include <process/future.hpp>
+#include <process/owned.hpp>
 #include <process/pid.hpp>
 
 #include <stout/option.hpp>
 
+#include "slave/csi_server.hpp"
 #include "slave/slave.hpp"
 #include "slave/task_status_update_manager.hpp"
 
@@ -47,6 +51,7 @@ using std::string;
 using std::vector;
 
 using process::Future;
+using process::Owned;
 using process::UPID;
 
 using testing::_;
@@ -104,6 +109,7 @@ MockSlave::MockSlave(
     SecretGenerator* secretGenerator,
     VolumeGidManager* volumeGidManager,
     PendingFutureTracker* futureTracker,
+    Owned<slave::CSIServer>&& csiServer,
     const Option<Authorizer*>& authorizer)
   // It is necessary to explicitly call `ProcessBase` constructor here even
   // though the direct parent `Slave` already does this. This is because
@@ -124,6 +130,7 @@ MockSlave::MockSlave(
         secretGenerator,
         volumeGidManager,
         futureTracker,
+        std::move(csiServer),
 #ifndef __WINDOWS__
         None(),
 #endif // __WINDOWS__
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 58daefa..cd13be2 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -31,6 +31,7 @@
 #include <mesos/slave/resource_estimator.hpp>
 
 #include <process/future.hpp>
+#include <process/owned.hpp>
 #include <process/pid.hpp>
 
 #include <stout/duration.hpp>
@@ -40,6 +41,7 @@
 
 #include "messages/messages.hpp"
 
+#include "slave/csi_server.hpp"
 #include "slave/slave.hpp"
 
 using ::testing::_;
@@ -101,6 +103,7 @@ public:
       SecretGenerator* secretGenerator,
       slave::VolumeGidManager* volumeGidManager,
       PendingFutureTracker* futureTracker,
+      process::Owned<slave::CSIServer>&& csiServer,
       const Option<Authorizer*>& authorizer);
 
   MOCK_METHOD6(___run, void(

Reply via email to