This is an automated email from the ASF dual-hosted git repository.

karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a4c1c1  expose heron metrics as a service in Nomad (#2859)
2a4c1c1 is described below

commit 2a4c1c1613bc2d10851503fcc35c571618a28ec1
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sat Apr 7 08:58:54 2018 -0700

    expose heron metrics as a service in Nomad (#2859)
---
 heron/config/src/yaml/conf/nomad/heron_nomad.sh    |   3 +
 .../config/src/yaml/conf/nomad/metrics_sinks.yaml  |   3 +-
 heron/config/src/yaml/conf/nomad/scheduler.yaml    |  17 ++++
 .../config/src/yaml/conf/standalone/heron_nomad.sh |   3 +
 .../src/yaml/conf/standalone/metrics_sinks.yaml    |   3 +-
 .../config/src/yaml/conf/standalone/scheduler.yaml |  14 +++
 .../standalone/templates/scheduler.template.yaml   |  14 +++
 .../heron/metricsmgr/sink/AbstractWebSink.java     |   7 +-
 heron/schedulers/src/java/BUILD                    |   1 +
 .../heron/scheduler/nomad/NomadConstants.java      |   5 +
 .../heron/scheduler/nomad/NomadContext.java        |  40 +++++++-
 .../heron/scheduler/nomad/NomadScheduler.java      |  95 ++++++++++++++++-
 .../heron/scheduler/nomad/NomadSchedulerTest.java  | 113 ++++++++++++++++++++-
 13 files changed, 305 insertions(+), 13 deletions(-)

diff --git a/heron/config/src/yaml/conf/nomad/heron_nomad.sh 
b/heron/config/src/yaml/conf/nomad/heron_nomad.sh
index 51ca593..826ba0d 100644
--- a/heron/config/src/yaml/conf/nomad/heron_nomad.sh
+++ b/heron/config/src/yaml/conf/nomad/heron_nomad.sh
@@ -25,6 +25,9 @@ fi
 # download and extract heron topology package
 ${HERON_TOPOLOGY_DOWNLOAD_CMD}
 
+# set metrics port file
+echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE}
+
 # launch heron executor
 trap 'kill -TERM $PID' TERM INT
 ${HERON_EXECUTOR_CMD} &
diff --git a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml 
b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
index b5257e2..a70f43a 100644
--- a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
+++ b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
@@ -60,7 +60,8 @@ tmaster-sink:
 
 prometheus-sink:
    class: "com.twitter.heron.metricsmgr.sink.PrometheusSink"
-   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+#   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+   port-file: "port_file"
    path: /metrics # The path on which to publish the metrics (mandatory)
    flat-metrics: true # By default the web-sink will publish a flat "name -> 
value" json map
    include-topology-name: true # Include topology name in metric name (default 
false)
diff --git a/heron/config/src/yaml/conf/nomad/scheduler.yaml 
b/heron/config/src/yaml/conf/nomad/scheduler.yaml
index 297afc4..21d4268 100644
--- a/heron/config/src/yaml/conf/nomad/scheduler.yaml
+++ b/heron/config/src/yaml/conf/nomad/scheduler.yaml
@@ -26,3 +26,20 @@ heron.nomad.driver: "docker"
 
 # The docker image to use for heron if the docker driver is used,
 heron.executor.docker.image: 'heron/heron:latest'
+
+# Set networking mode networking when driver is docker
+heron.nomad.network.mode: "default"
+
+# whether to register metrics service endpoints for prometheus metrics sink in 
consul
+# the service will be named in the format: 
metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: True
+
+# interval at which health checks should be conducted for metrics service 
endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
\ No newline at end of file
diff --git a/heron/config/src/yaml/conf/standalone/heron_nomad.sh 
b/heron/config/src/yaml/conf/standalone/heron_nomad.sh
index 51ca593..826ba0d 100644
--- a/heron/config/src/yaml/conf/standalone/heron_nomad.sh
+++ b/heron/config/src/yaml/conf/standalone/heron_nomad.sh
@@ -25,6 +25,9 @@ fi
 # download and extract heron topology package
 ${HERON_TOPOLOGY_DOWNLOAD_CMD}
 
+# set metrics port file
+echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE}
+
 # launch heron executor
 trap 'kill -TERM $PID' TERM INT
 ${HERON_EXECUTOR_CMD} &
diff --git a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml 
b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
index b5257e2..a70f43a 100644
--- a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
+++ b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
@@ -60,7 +60,8 @@ tmaster-sink:
 
 prometheus-sink:
    class: "com.twitter.heron.metricsmgr.sink.PrometheusSink"
-   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+#   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+   port-file: "port_file"
    path: /metrics # The path on which to publish the metrics (mandatory)
    flat-metrics: true # By default the web-sink will publish a flat "name -> 
value" json map
    include-topology-name: true # Include topology name in metric name (default 
false)
diff --git a/heron/config/src/yaml/conf/standalone/scheduler.yaml 
b/heron/config/src/yaml/conf/standalone/scheduler.yaml
index 48f512f..ff34902 100644
--- a/heron/config/src/yaml/conf/standalone/scheduler.yaml
+++ b/heron/config/src/yaml/conf/standalone/scheduler.yaml
@@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000
 
 # standalone mode uses the raw_exec driver
 heron.nomad.driver: "raw_exec"
+
+# whether to register metrics service endpoints for prometheus metrics sink in 
consul
+# the service will be named in the format: 
metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: False
+
+# interval at which health checks should be conducted for metrics service 
endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service in a comma delimited list
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
diff --git 
a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml 
b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
index 443bb5b..0a1becb 100644
--- a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
+++ b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
@@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000
 
 # standalone mode uses the raw_exec driver
 heron.nomad.driver: "raw_exec"
+
+# whether to register metrics service endpoints for prometheus metrics sink in 
consul
+# the service will be named in the format: 
metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: False
+
+# interval at which health checks should be conducted for metrics service 
endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
diff --git 
a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
 
b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
index 13449c6..fd64286 100644
--- 
a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
+++ 
b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
@@ -82,7 +82,7 @@ abstract class AbstractWebSink implements IMetricsSink {
   @Override
   public final void init(Map<String, Object> conf, SinkContext context) {
     String path = (String) conf.get(KEY_PATH);
-    String portFile = (String) conf.get(KEY_PORT_FILE);
+    String portFile = getServerPortFile(conf);
 
     cacheMaxSize = 
TypeUtils.getLong(conf.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE,
         DEFAULT_MAX_CACHE_SIZE));
@@ -132,6 +132,7 @@ abstract class AbstractWebSink implements IMetricsSink {
         os.close();
         LOG.log(Level.INFO, "Received metrics request.");
       });
+      LOG.info("Starting web sink server on port: " + port);
       httpServer.start();
     } catch (IOException e) {
       throw new RuntimeException("Failed to create Http server on port " + 
port, e);
@@ -164,4 +165,8 @@ abstract class AbstractWebSink implements IMetricsSink {
       httpServer.stop(0);
     }
   }
+
+  public static String getServerPortFile(Map<String, Object> conf) {
+    return (String) conf.get(KEY_PORT_FILE);
+  }
 }
diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD
index 24bdb3f..4a83f2f 100644
--- a/heron/schedulers/src/java/BUILD
+++ b/heron/schedulers/src/java/BUILD
@@ -69,6 +69,7 @@ nomad_sdk_deps = [
 nomad_deps_files = \
     scheduler_deps_files + nomad_sdk_deps + [
         ":scheduler-utils-java",
+        "//heron/metricsmgr/src/java:metricsmgr-java"
     ]
 
 java_library(
diff --git 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
index b82cf82..bfc4fe0 100644
--- 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
+++ 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
@@ -43,6 +43,7 @@ public final class NomadConstants {
 
   public static final String JOB_LINK = "/ui/jobs";
   public static final String HOST = "HOST";
+  public static final String NETWORK_MODE = "network_mode";
 
   public static final String NOMAD_TASK_COMMAND = "command";
   public static final String NOMAD_TASK_COMMAND_ARGS = "args";
@@ -51,6 +52,7 @@ public final class NomadConstants {
   public static final String NOMAD_DEFAULT_DATACENTER = "dc1";
   public static final String SHELL_CMD = "/bin/sh";
   public static final String NOMAD_HERON_SCRIPT_NAME = "run_heron_executor.sh";
+  public static final String NOMAD_SERVICE_CHECK_TYPE = "tcp";
 
   public static final String HERON_NOMAD_WORKING_DIR = 
"HERON_NOMAD_WORKING_DIR";
   public static final String HERON_USE_CORE_PACKAGE_URI = 
"HERON_USE_CORE_PACKAGE_URI";
@@ -88,6 +90,9 @@ public final class NomadConstants {
   // port number the start with when more than one port needed for remote 
debugging
   public static final String JVM_REMOTE_DEBUGGER_PORT = 
String.format("${NOMAD_PORT_%s}",
       SchedulerUtils.ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS.getName());
+  // port for metrics webserver (AbstractWebSink)
+  public static final String METRICS_PORT = "metrics_port";
+  public static final String METRICS_PORT_FILE = "METRICS_PORT_FILE";
   public static final Map<SchedulerUtils.ExecutorPort, String> EXECUTOR_PORTS 
= new HashMap<>();
 
   static {
diff --git 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
index c665559..2dd7f5f 100644
--- 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
+++ 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
@@ -30,6 +30,20 @@ public class NomadContext extends Context {
 
   public static final String HERON_EXECUTOR_DOCKER_IMAGE = 
"heron.executor.docker.image";
 
+  public static final String HERON_NOMAD_NETWORK_MODE = 
"heron.nomad.network.mode";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_REGISTER
+      = "heron.nomad.metrics.service.register";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC
+      = "heron.nomad.metrics.service.check.interval.sec";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC
+      = "heron.nomad.metrics.service.check.timeout.sec";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS
+      = "heron.nomad.metrics.service.additional.tags";
+
   public static String workingDirectory(Config config) {
     return config.getStringValue(
         NomadKey.WORKING_DIRECTORY.value(), 
NomadKey.WORKING_DIRECTORY.getDefaultString());
@@ -42,7 +56,7 @@ public class NomadContext extends Context {
   }
 
   public static String getSchedulerURI(Config config) {
-    return config.getStringValue(HERON_NOMAD_SCHEDULER_URI);
+    return config.getStringValue(HERON_NOMAD_SCHEDULER_URI, 
"http://127.0.0.1:4646";);
   }
 
   public static int getCoreFreqMapping(Config config) {
@@ -50,10 +64,30 @@ public class NomadContext extends Context {
   }
 
   public static String getHeronNomadDriver(Config config) {
-    return config.getStringValue(HERON_NOMAD_DRIVER);
+    return config.getStringValue(HERON_NOMAD_DRIVER, "docker");
   }
 
   public static String getHeronExecutorDockerImage(Config config) {
-    return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE);
+    return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE, 
"heron/heron:latest");
+  }
+
+  public static boolean getHeronNomadMetricsServiceRegister(Config config) {
+    return config.getBooleanValue(HERON_NOMAD_METRICS_SERVICE_REGISTER, false);
+  }
+
+  public static int getHeronNomadMetricsServiceCheckIntervalSec(Config config) 
{
+    return 
config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, 10);
+  }
+
+  public static int getHeronNomadMetricsServiceCheckTimeoutSec(Config config) {
+    return 
config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, 2);
+  }
+
+  public static String[] getHeronNomadMetricsServiceAdditionalTags(Config 
config) {
+    return config.getStringValue(HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, 
"").split(",");
+  }
+
+  public static String getHeronNomadNetworkMode(Config config) {
+    return config.getStringValue(HERON_NOMAD_NETWORK_MODE, "default");
   }
 }
diff --git 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
index 7531f23..2b76505 100644
--- 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
+++ 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
@@ -13,6 +13,7 @@
 //  limitations under the License.
 package com.twitter.heron.scheduler.nomad;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -23,6 +24,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
@@ -33,6 +35,8 @@ import com.hashicorp.nomad.apimodel.JobListStub;
 import com.hashicorp.nomad.apimodel.NetworkResource;
 import com.hashicorp.nomad.apimodel.Port;
 import com.hashicorp.nomad.apimodel.Resources;
+import com.hashicorp.nomad.apimodel.Service;
+import com.hashicorp.nomad.apimodel.ServiceCheck;
 import com.hashicorp.nomad.apimodel.Task;
 import com.hashicorp.nomad.apimodel.TaskGroup;
 import com.hashicorp.nomad.apimodel.Template;
@@ -42,6 +46,8 @@ import com.hashicorp.nomad.javasdk.NomadApiConfiguration;
 import com.hashicorp.nomad.javasdk.NomadException;
 import com.hashicorp.nomad.javasdk.ServerQueryResponse;
 
+import com.twitter.heron.metricsmgr.MetricsSinksConfig;
+import com.twitter.heron.metricsmgr.sink.PrometheusSink;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.UpdateTopologyManager;
 import com.twitter.heron.scheduler.utils.Runtime;
@@ -53,6 +59,8 @@ import com.twitter.heron.spi.packing.PackingPlan;
 import com.twitter.heron.spi.packing.Resource;
 import com.twitter.heron.spi.scheduler.IScheduler;
 
+import static com.twitter.heron.scheduler.nomad.NomadConstants.METRICS_PORT;
+
 @SuppressWarnings("IllegalCatch")
 public class NomadScheduler implements IScheduler {
 
@@ -256,7 +264,8 @@ public class NomadScheduler implements IScheduler {
       i++;
     }
 
-    resourceReqs.addNetworks(new NetworkResource().addDynamicPorts(ports));
+    NetworkResource networkResource = new NetworkResource();
+    networkResource.addDynamicPorts(ports);
 
     // set memory requirements
     long memoryReqMb = containerResource.getRam().asMegabytes();
@@ -270,8 +279,44 @@ public class NomadScheduler implements IScheduler {
     // set disk requirements
     long diskReqMb = containerResource.getDisk().asMegabytes();
     resourceReqs.setDiskMb(longToInt(diskReqMb));
-    task.setResources(resourceReqs);
 
+    // allocate dynamic port for prometheus/websink metrics
+    String prometheusPortFile = getPrometheusMetricsFile(this.localConfig);
+    if (prometheusPortFile == null) {
+      LOG.severe("Failed to find port file for Prometheus metrics. "
+          + "Please check metrics sinks configurations");
+    } else {
+      networkResource.addDynamicPorts(new Port().setLabel(METRICS_PORT));
+      task.addEnv(NomadConstants.METRICS_PORT_FILE, prometheusPortFile);
+
+      if (NomadContext.getHeronNomadMetricsServiceRegister(this.localConfig)) {
+        // getting tags for service
+        List<String> tags = new LinkedList<>();
+        tags.add(String.format("%s-%s",
+            Runtime.topologyName(this.runtimeConfig), containerIndex));
+        tags.addAll(Arrays.asList(
+            
NomadContext.getHeronNomadMetricsServiceAdditionalTags(this.localConfig)));
+        //register metrics service with consul
+        Service service = new Service()
+            .setName(
+                
getMetricsServiceName(Runtime.topologyName(this.runtimeConfig), containerIndex))
+            .setPortLabel(METRICS_PORT)
+            .setTags(tags)
+            .addChecks(new 
ServiceCheck().setType(NomadConstants.NOMAD_SERVICE_CHECK_TYPE)
+                .setPortLabel(METRICS_PORT)
+                .setInterval(TimeUnit.NANOSECONDS.convert(
+                    
NomadContext.getHeronNomadMetricsServiceCheckIntervalSec(this.localConfig),
+                    TimeUnit.SECONDS))
+                .setTimeout(TimeUnit.NANOSECONDS.convert(
+                    
NomadContext.getHeronNomadMetricsServiceCheckTimeoutSec(this.localConfig),
+                    TimeUnit.SECONDS)));
+
+        task.addServices(service);
+      }
+    }
+
+    resourceReqs.addNetworks(networkResource);
+    task.setResources(resourceReqs);
     return task;
   }
 
@@ -301,7 +346,13 @@ public class NomadScheduler implements IScheduler {
         NomadContext.getHeronExecutorDockerImage(this.localConfig));
 
     task.addConfig(NomadConstants.NOMAD_TASK_COMMAND, 
NomadConstants.SHELL_CMD);
-    String[] args = {"-c", String.format("%s && %s", topologyDownloadCmd, 
executorCmd)};
+    task.addConfig(NomadConstants.NETWORK_MODE,
+        NomadContext.getHeronNomadNetworkMode(this.localConfig));
+
+    String setMetricsPortFileCmd = getSetMetricsPortFileCmd();
+
+    String[] args = {"-c", String.format("%s && %s && %s",
+        topologyDownloadCmd, setMetricsPortFileCmd, executorCmd)};
 
     task.addConfig(NomadConstants.NOMAD_TASK_COMMAND_ARGS, args);
 
@@ -341,7 +392,6 @@ public class NomadScheduler implements IScheduler {
     template.setDestPath(NomadConstants.NOMAD_HERON_SCRIPT_NAME);
     task.addTemplates(template);
 
-    Resources resourceReqs = new Resources();
     // configure nomad to allocate dynamic ports
     Port[] ports = new Port[NomadConstants.EXECUTOR_PORTS.size()];
     int i = 0;
@@ -517,4 +567,41 @@ public class NomadScheduler implements IScheduler {
   Resource getHomogeneousContainerResource(PackingPlan homogeneousPackingPlan) 
{
     return 
homogeneousPackingPlan.getContainers().iterator().next().getRequiredResource();
   }
+
+  static String getPrometheusMetricsFile(Config config) {
+    MetricsSinksConfig metricsSinksConfig;
+    try {
+      metricsSinksConfig = new 
MetricsSinksConfig(Context.metricsSinksFile(config));
+    } catch (FileNotFoundException e) {
+      return null;
+    }
+
+    String prometheusSinkId = null;
+    Map<String, Object> prometheusSinkConfig = null;
+    for (String sinkId : metricsSinksConfig.getSinkIds()) {
+      Map<String, Object> sinkConfig = 
metricsSinksConfig.getConfigForSink(sinkId);
+      Object className = 
sinkConfig.get(MetricsSinksConfig.CONFIG_KEY_CLASSNAME);
+      if (className != null && className instanceof String) {
+        if (PrometheusSink.class.getName().equals(className)) {
+          prometheusSinkId = sinkId;
+          prometheusSinkConfig = sinkConfig;
+        }
+      }
+    }
+    if (prometheusSinkId == null || prometheusSinkConfig == null) {
+      return null;
+    }
+
+    String prometheusMetricsPortFile = 
PrometheusSink.getServerPortFile(prometheusSinkConfig);
+    return prometheusMetricsPortFile;
+  }
+
+  static String getMetricsServiceName(String topologyName, int containerIndex) 
{
+    return String.format("metrics-heron-%s-%s", topologyName, containerIndex);
+  }
+
+  static String getSetMetricsPortFileCmd() {
+    return String.format("echo ${NOMAD_PORT_%s} > ${%s}",
+        NomadConstants.METRICS_PORT, NomadConstants.METRICS_PORT_FILE);
+  }
 }
diff --git 
a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
 
b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
index d6bba66..c14d65d 100644
--- 
a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
+++ 
b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
@@ -18,10 +18,12 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import com.google.common.base.Optional;
 import com.hashicorp.nomad.apimodel.Job;
+import com.hashicorp.nomad.apimodel.Port;
 import com.hashicorp.nomad.apimodel.Task;
 import com.hashicorp.nomad.apimodel.TaskGroup;
 import com.hashicorp.nomad.javasdk.NomadApiClient;
@@ -73,6 +75,7 @@ public class NomadSchedulerTest {
   private static final String CORE_PACKAGE_URI = "core-package-uri";
   private static final Boolean USE_CORE_PACKAGE_URI = true;
   private static final String EXECUTOR_BINARY = "executor-binary";
+  private static final String PORT_FILE = "port-file";
 
   private static NomadScheduler scheduler;
 
@@ -91,6 +94,7 @@ public class NomadSchedulerTest {
         .put(Key.USE_CORE_PACKAGE_URI, USE_CORE_PACKAGE_URI)
         .put(Key.EXECUTOR_BINARY, EXECUTOR_BINARY)
         .put(NomadContext.HERON_NOMAD_DRIVER, 
NomadConstants.NomadDriver.RAW_EXEC.getName())
+        .put(NomadContext.HERON_NOMAD_NETWORK_MODE, "default")
         .build();
 
     this.mockRuntime = config;
@@ -322,6 +326,8 @@ public class NomadSchedulerTest {
         .thenReturn((int) MEMORY_RESOURCE.asMegabytes());
     PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
         .thenReturn((int) DISK_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+        .thenReturn(PORT_FILE);
 
     scheduler.initialize(this.mockConfig, this.mockRuntime);
 
@@ -352,6 +358,7 @@ public class NomadSchedulerTest {
     
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_CORE_PACKAGE_URI));
     
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
     
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_EXECUTOR_CMD));
+    
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
 
     Assert.assertEquals(NomadKey.WORKING_DIRECTORY.getDefaultString() + 
"/container-"
             + String.valueOf(CONTAINER_INDEX),
@@ -364,17 +371,19 @@ public class NomadSchedulerTest {
         task.getEnv().get(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
     Assert.assertEquals("./heron-core/bin/heron-executor args1 args2",
         task.getEnv().get(NomadConstants.HERON_EXECUTOR_CMD));
+    Assert.assertEquals(PORT_FILE,
+        task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
   }
 
   @SuppressWarnings("unchecked")
   @Test
   public void testGetTaskDocker() {
 
-    this.mockRuntime = this.mockRuntime.newBuilder()
+    this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockRuntime)
         .put(NomadContext.HERON_NOMAD_DRIVER, 
NomadConstants.NomadDriver.DOCKER.getName())
         .build();
 
-    this.mockConfig = this.mockConfig.newBuilder()
+    this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
         .put(NomadContext.HERON_NOMAD_DRIVER, 
NomadConstants.NomadDriver.DOCKER.getName())
         .build();
 
@@ -398,6 +407,8 @@ public class NomadSchedulerTest {
         .thenReturn((int) MEMORY_RESOURCE.asMegabytes());
     PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
         .thenReturn((int) DISK_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+        .thenReturn(PORT_FILE);
 
     scheduler.initialize(this.mockConfig, this.mockRuntime);
 
@@ -409,6 +420,8 @@ public class NomadSchedulerTest {
     
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NOMAD_TASK_COMMAND));
     Assert.assertEquals(NomadConstants.SHELL_CMD,
         task.getConfig().get(NomadConstants.NOMAD_TASK_COMMAND));
+    
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
+
 
     Assert.assertEquals((int) CPU_RESOURCE * HERON_NOMAD_CORE_FREQ_MAPPING,
         task.getResources().getCpu().intValue());
@@ -417,8 +430,102 @@ public class NomadSchedulerTest {
     Assert.assertEquals((int) DISK_RESOURCE.asMegabytes(),
         task.getResources().getDiskMb().intValue());
     Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HOST));
-
+    Assert.assertEquals(PORT_FILE,
+        task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
     Assert.assertEquals("${attr.unique.network.ip-address}",
         task.getEnv().get(NomadConstants.HOST));
+    
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NETWORK_MODE));
+    Assert.assertEquals("default",
+        task.getConfig().get(NomadConstants.NETWORK_MODE));
+
+    Assert.assertEquals(task.getResources().getNetworks().size(), 1);
+
+    Set<String> ports = new HashSet<>();
+    for (Port entry : 
task.getResources().getNetworks().get(0).getDynamicPorts()) {
+      ports.add(entry.getLabel());
+      Assert.assertEquals(entry.getValue(), 0);
+    }
+
+    for (SchedulerUtils.ExecutorPort entry : 
NomadConstants.EXECUTOR_PORTS.keySet()) {
+      Assert.assertTrue(ports.contains(entry.getName().replace("-", "_")));
+    }
+    Assert.assertTrue(ports.contains(NomadConstants.METRICS_PORT));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testServiceCheck() {
+    this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, 
"foo,bar")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
+
+    this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, 
"foo,bar")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
+
+    Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
+    containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
+
+    PowerMockito.mockStatic(SchedulerUtils.class);
+
+    Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, 
DISK_RESOURCE);
+
+    PowerMockito.when(SchedulerUtils.executorCommandArgs(
+        Mockito.any(), Mockito.any(), Mockito.anyMap(), Mockito.anyString()))
+        .thenReturn(EXECUTOR_CMD_ARGS);
+
+    PowerMockito.mockStatic(NomadScheduler.class);
+    PowerMockito.when(NomadScheduler.getFetchCommand(Mockito.any(), 
Mockito.any()))
+        .thenReturn(TOPOLOGY_DOWNLOAD_CMD);
+    PowerMockito.when(NomadScheduler.getHeronNomadScript(this.mockConfig))
+        .thenReturn(HERON_NOMAD_SCRIPT);
+    PowerMockito.when(NomadScheduler.longToInt(MEMORY_RESOURCE.asMegabytes()))
+        .thenReturn((int) MEMORY_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
+        .thenReturn((int) DISK_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+        .thenReturn(PORT_FILE);
+    PowerMockito.when(NomadScheduler.getMetricsServiceName(Mockito.any(), 
Mockito.anyInt()))
+        .thenReturn(String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, 
CONTAINER_INDEX));
+
+    scheduler.initialize(this.mockConfig, this.mockRuntime);
+
+    Task task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
+    LOG.info("task: " + task);
+
+    Assert.assertEquals(task.getServices().size(), 1);
+    Assert.assertEquals(task.getServices().get(0).getName(),
+        String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX));
+
+    String[] tags = {String.format("%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX), 
"foo", "bar"};
+    Assert.assertEquals(task.getServices().get(0).getTags(), 
Arrays.asList(tags));
+    Assert.assertEquals(task.getServices().get(0).getPortLabel(), 
NomadConstants.METRICS_PORT);
+    Assert.assertEquals(task.getServices().get(0).getChecks().size(), 1);
+    
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getPortLabel(),
+        NomadConstants.METRICS_PORT);
+    Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getType(),
+        NomadConstants.NOMAD_SERVICE_CHECK_TYPE);
+    TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
+    
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getInterval(),
+        TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS));
+    
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getTimeout(),
+        TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS));
+
+
+    // if service registration is turned off
+    this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
+
+    this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
+    scheduler.initialize(this.mockConfig, this.mockRuntime);
+
+    task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
+    LOG.info("task: " + task);
+    Assert.assertTrue(task.getServices() == null);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
karth...@apache.org.

Reply via email to