Implemented blkio subsystem usage() for resource statistics.

Implemented blkio subsystem usage() for resource statistics.

Review: https://reviews.apache.org/r/60934/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5325188
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5325188
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5325188

Branch: refs/heads/master
Commit: d5325188689c903513f0d9fa121dfdda06fb0de3
Parents: 1b89fb0
Author: Gilbert Song <songzihao1...@gmail.com>
Authored: Mon Jul 31 15:45:40 2017 -0700
Committer: Gilbert Song <songzihao1...@gmail.com>
Committed: Mon Jul 31 15:45:40 2017 -0700

----------------------------------------------------------------------
 .../isolators/cgroups/subsystems/blkio.cpp      | 335 +++++++++++++++++++
 .../isolators/cgroups/subsystems/blkio.hpp      |   4 +
 2 files changed, 339 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d5325188/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.cpp
----------------------------------------------------------------------
diff --git 
a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.cpp 
b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.cpp
index 6be0f9e..96014b5 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.cpp
@@ -16,11 +16,25 @@
 
 #include <process/id.hpp>
 
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+
+#include "linux/cgroups.hpp"
+
 #include "slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.hpp"
 
+namespace cfq = cgroups::blkio::cfq;
+namespace throttle = cgroups::blkio::throttle;
+
+using process::Failure;
+using process::Future;
 using process::Owned;
 
 using std::string;
+using std::vector;
+
+using cgroups::blkio::Device;
+using cgroups::blkio::Operation;
 
 namespace mesos {
 namespace internal {
@@ -40,6 +54,327 @@ BlkioSubsystem::BlkioSubsystem(
   : ProcessBase(process::ID::generate("cgroups-blkio-subsystem")),
     Subsystem(_flags, _hierarchy) {}
 
+
+static void setValue(
+    const cgroups::blkio::Value& statValue,
+    CgroupInfo::Blkio::Value* value)
+{
+  if (statValue.op.isNone()) {
+    value->set_op(CgroupInfo::Blkio::UNKNOWN);
+  } else {
+    switch(statValue.op.get()) {
+      case Operation::TOTAL:
+        value->set_op(CgroupInfo::Blkio::TOTAL);
+        break;
+      case Operation::READ:
+        value->set_op(CgroupInfo::Blkio::READ);
+        break;
+      case Operation::WRITE:
+        value->set_op(CgroupInfo::Blkio::WRITE);
+        break;
+      case Operation::SYNC:
+        value->set_op(CgroupInfo::Blkio::SYNC);
+        break;
+      case Operation::ASYNC:
+        value->set_op(CgroupInfo::Blkio::ASYNC);
+        break;
+    }
+  }
+
+  value->set_value(statValue.value);
+}
+
+
+Future<ResourceStatistics> BlkioSubsystem::usage(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  hashmap<dev_t, CgroupInfo::Blkio::CFQ::Statistics> cfq;
+  hashmap<dev_t, CgroupInfo::Blkio::CFQ::Statistics> cfqRecursive;
+  hashmap<dev_t, CgroupInfo::Blkio::Throttling::Statistics> throttling;
+
+  CgroupInfo::Blkio::CFQ::Statistics totalCfq;
+  CgroupInfo::Blkio::CFQ::Statistics totalCfqRecursive;
+  CgroupInfo::Blkio::Throttling::Statistics totalThrottling;
+
+  // Get CFQ statistics.
+  Try<vector<cgroups::blkio::Value>> time = cfq::time(hierarchy, cgroup);
+  if (time.isError()) {
+    return Failure(time.error());
+  }
+
+  foreach (const cgroups::blkio::Value& value, time.get()) {
+    if (value.device.isNone()) {
+      totalCfq.set_time(value.value);
+    } else {
+      cfq[value.device.get()].set_time(value.value);
+    }
+  }
+
+  Try<vector<cgroups::blkio::Value>> sectors = cfq::sectors(hierarchy, cgroup);
+  if (sectors.isError()) {
+    return Failure(sectors.error());
+  }
+
+  foreach (const cgroups::blkio::Value& value, sectors.get()) {
+    if (value.device.isNone()) {
+      totalCfq.set_sectors(value.value);
+    } else {
+      cfq[value.device.get()].set_sectors(value.value);
+    }
+  }
+
+  Try<vector<cgroups::blkio::Value>> io_service_bytes =
+    cfq::io_service_bytes(hierarchy, cgroup);
+
+  if (io_service_bytes.isError()) {
+    return Failure(io_service_bytes.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_service_bytes.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfq[statValue.device.get()].add_io_service_bytes()
+      : totalCfq.add_io_service_bytes();
+
+    setValue(statValue, value);
+  }
+
+  Try<vector<cgroups::blkio::Value>> io_serviced =
+    cfq::io_serviced(hierarchy, cgroup);
+
+  if (io_serviced.isError()) {
+    return Failure(io_serviced.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_serviced.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfq[statValue.device.get()].add_io_serviced()
+      : totalCfq.add_io_serviced();
+
+    setValue(statValue, value);
+  }
+
+  Try<vector<cgroups::blkio::Value>> io_service_time =
+    cfq::io_service_time(hierarchy, cgroup);
+
+  if (io_service_time.isError()) {
+    return Failure(io_service_time.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_service_time.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfq[statValue.device.get()].add_io_service_time()
+      : totalCfq.add_io_service_time();
+
+    setValue(statValue, value);
+  }
+
+  Try<vector<cgroups::blkio::Value>> io_wait_time =
+    cfq::io_wait_time(hierarchy, cgroup);
+
+  if (io_wait_time.isError()) {
+    return Failure(io_wait_time.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_wait_time.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfq[statValue.device.get()].add_io_wait_time()
+      : totalCfq.add_io_wait_time();
+
+    setValue(statValue, value);
+  }
+
+  Try<vector<cgroups::blkio::Value>> io_merged =
+    cfq::io_merged(hierarchy, cgroup);
+
+  if (io_merged.isError()) {
+    return Failure(io_merged.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_merged.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfq[statValue.device.get()].add_io_merged()
+      : totalCfq.add_io_merged();
+
+    setValue(statValue, value);
+  }
+
+  Try<vector<cgroups::blkio::Value>> io_queued =
+    cfq::io_queued(hierarchy, cgroup);
+
+  if (io_queued.isError()) {
+    return Failure(io_queued.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_queued.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfq[statValue.device.get()].add_io_queued()
+      : totalCfq.add_io_queued();
+
+    setValue(statValue, value);
+  }
+
+  // Get CFQ recursive statistics (blkio.*_recursive).
+  time = cfq::time_recursive(hierarchy, cgroup);
+  if (time.isError()) {
+    return Failure(time.error());
+  }
+
+  foreach (const cgroups::blkio::Value& value, time.get()) {
+    if (value.device.isNone()) {
+      totalCfqRecursive.set_time(value.value);
+    } else {
+      cfqRecursive[value.device.get()].set_time(value.value);
+    }
+  }
+
+  sectors = cfq::sectors_recursive(hierarchy, cgroup);
+  if (sectors.isError()) {
+    return Failure(sectors.error());
+  }
+
+  foreach (const cgroups::blkio::Value& value, sectors.get()) {
+    if (value.device.isNone()) {
+      totalCfqRecursive.set_sectors(value.value);
+    } else {
+      cfqRecursive[value.device.get()].set_sectors(value.value);
+    }
+  }
+
+  io_service_bytes = cfq::io_service_bytes_recursive(hierarchy, cgroup);
+  if (io_service_bytes.isError()) {
+    return Failure(io_service_bytes.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_service_bytes.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfqRecursive[statValue.device.get()].add_io_service_bytes()
+      : totalCfqRecursive.add_io_service_bytes();
+
+    setValue(statValue, value);
+  }
+
+  io_serviced = cfq::io_serviced_recursive(hierarchy, cgroup);
+  if (io_serviced.isError()) {
+    return Failure(io_serviced.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_serviced.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfqRecursive[statValue.device.get()].add_io_serviced()
+      : totalCfqRecursive.add_io_serviced();
+
+    setValue(statValue, value);
+  }
+
+  io_service_time = cfq::io_service_time_recursive(hierarchy, cgroup);
+  if (io_service_time.isError()) {
+    return Failure(io_service_time.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_service_time.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfqRecursive[statValue.device.get()].add_io_service_time()
+      : totalCfqRecursive.add_io_service_time();
+
+    setValue(statValue, value);
+  }
+
+  io_wait_time = cfq::io_wait_time_recursive(hierarchy, cgroup);
+  if (io_wait_time.isError()) {
+    return Failure(io_wait_time.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_wait_time.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfqRecursive[statValue.device.get()].add_io_wait_time()
+      : totalCfqRecursive.add_io_wait_time();
+
+    setValue(statValue, value);
+  }
+
+  io_merged = cfq::io_merged_recursive(hierarchy, cgroup);
+  if (io_merged.isError()) {
+    return Failure(io_merged.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_merged.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfqRecursive[statValue.device.get()].add_io_merged()
+      : totalCfqRecursive.add_io_merged();
+
+    setValue(statValue, value);
+  }
+
+  io_queued = cfq::io_queued_recursive(hierarchy, cgroup);
+  if (io_queued.isError()) {
+    return Failure(io_queued.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_queued.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? cfqRecursive[statValue.device.get()].add_io_queued()
+      : totalCfqRecursive.add_io_queued();
+
+    setValue(statValue, value);
+  }
+
+  // Get throttling statistics.
+  io_serviced = throttle::io_serviced(hierarchy, cgroup);
+  if (io_serviced.isError()) {
+    return Failure(io_serviced.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_serviced.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? throttling[statValue.device.get()].add_io_serviced()
+      : totalThrottling.add_io_serviced();
+
+    setValue(statValue, value);
+  }
+
+  io_service_bytes = throttle::io_service_bytes(hierarchy, cgroup);
+  if (io_service_bytes.isError()) {
+    return Failure(io_service_bytes.error());
+  }
+
+  foreach (const cgroups::blkio::Value& statValue, io_service_bytes.get()) {
+    CgroupInfo::Blkio::Value* value = statValue.device.isSome()
+      ? throttling[statValue.device.get()].add_io_service_bytes()
+      : totalThrottling.add_io_service_bytes();
+
+    setValue(statValue, value);
+  }
+
+  // Add up resource statistics.
+  ResourceStatistics result;
+  CgroupInfo::Blkio::Statistics* stat = result.mutable_blkio_statistics();
+
+  foreachkey (dev_t dev, cfq) {
+    cfq[dev].mutable_device()->set_major(major(dev));
+    cfq[dev].mutable_device()->set_minor(minor(dev));
+    stat->add_cfq()->CopyFrom(cfq[dev]);
+  }
+
+  foreachkey (dev_t dev, cfqRecursive) {
+    cfqRecursive[dev].mutable_device()->set_major(major(dev));
+    cfqRecursive[dev].mutable_device()->set_minor(minor(dev));
+    stat->add_cfq_recursive()->CopyFrom(cfqRecursive[dev]);
+  }
+
+  foreachkey (dev_t dev, throttling) {
+    throttling[dev].mutable_device()->set_major(major(dev));
+    throttling[dev].mutable_device()->set_minor(minor(dev));
+    stat->add_throttling()->CopyFrom(throttling[dev]);
+  }
+
+  stat->add_cfq()->CopyFrom(totalCfq);
+  stat->add_cfq_recursive()->CopyFrom(totalCfqRecursive);
+  stat->add_throttling()->CopyFrom(totalThrottling);
+
+  return result;
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5325188/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.hpp
----------------------------------------------------------------------
diff --git 
a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.hpp 
b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.hpp
index a2c575c..ebddf4e 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystems/blkio.hpp
@@ -49,6 +49,10 @@ public:
     return CGROUP_SUBSYSTEM_BLKIO_NAME;
   };
 
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
 private:
   BlkioSubsystem(const Flags& flags, const std::string& hierarchy);
 };

Reply via email to