kramasamy closed pull request #2859: expose heron metrics as a service in Nomad
URL: https://github.com/apache/incubator-heron/pull/2859
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/config/src/yaml/conf/nomad/heron_nomad.sh 
b/heron/config/src/yaml/conf/nomad/heron_nomad.sh
index 51ca593d57..826ba0d326 100644
--- a/heron/config/src/yaml/conf/nomad/heron_nomad.sh
+++ b/heron/config/src/yaml/conf/nomad/heron_nomad.sh
@@ -25,6 +25,9 @@ fi
 # download and extract heron topology package
 ${HERON_TOPOLOGY_DOWNLOAD_CMD}
 
+# set metrics port file
+echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE}
+
 # launch heron executor
 trap 'kill -TERM $PID' TERM INT
 ${HERON_EXECUTOR_CMD} &
diff --git a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml 
b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
index b5257e2c93..a70f43acd6 100644
--- a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
+++ b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
@@ -60,7 +60,8 @@ tmaster-sink:
 
 prometheus-sink:
    class: "com.twitter.heron.metricsmgr.sink.PrometheusSink"
-   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+#   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+   port-file: "port_file"
    path: /metrics # The path on which to publish the metrics (mandatory)
    flat-metrics: true # By default the web-sink will publish a flat "name -> 
value" json map
    include-topology-name: true # Include topology name in metric name (default 
false)
diff --git a/heron/config/src/yaml/conf/nomad/scheduler.yaml 
b/heron/config/src/yaml/conf/nomad/scheduler.yaml
index 297afc47ec..21d42688b0 100644
--- a/heron/config/src/yaml/conf/nomad/scheduler.yaml
+++ b/heron/config/src/yaml/conf/nomad/scheduler.yaml
@@ -26,3 +26,20 @@ heron.nomad.driver: "docker"
 
 # The docker image to use for heron if the docker driver is used,
 heron.executor.docker.image: 'heron/heron:latest'
+
+# Set networking mode networking when driver is docker
+heron.nomad.network.mode: "default"
+
+# whether to register metrics service endpoints for prometheus metrics sink in 
consul
+# the service will be named in the format: 
metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: True
+
+# interval at which health checks should be conducted for metrics service 
endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
\ No newline at end of file
diff --git a/heron/config/src/yaml/conf/standalone/heron_nomad.sh 
b/heron/config/src/yaml/conf/standalone/heron_nomad.sh
index 51ca593d57..826ba0d326 100644
--- a/heron/config/src/yaml/conf/standalone/heron_nomad.sh
+++ b/heron/config/src/yaml/conf/standalone/heron_nomad.sh
@@ -25,6 +25,9 @@ fi
 # download and extract heron topology package
 ${HERON_TOPOLOGY_DOWNLOAD_CMD}
 
+# set metrics port file
+echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE}
+
 # launch heron executor
 trap 'kill -TERM $PID' TERM INT
 ${HERON_EXECUTOR_CMD} &
diff --git a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml 
b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
index b5257e2c93..a70f43acd6 100644
--- a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
+++ b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
@@ -60,7 +60,8 @@ tmaster-sink:
 
 prometheus-sink:
    class: "com.twitter.heron.metricsmgr.sink.PrometheusSink"
-   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+#   port: 8080 # The port on which to run (either port or port-file are 
mandatory)
+   port-file: "port_file"
    path: /metrics # The path on which to publish the metrics (mandatory)
    flat-metrics: true # By default the web-sink will publish a flat "name -> 
value" json map
    include-topology-name: true # Include topology name in metric name (default 
false)
diff --git a/heron/config/src/yaml/conf/standalone/scheduler.yaml 
b/heron/config/src/yaml/conf/standalone/scheduler.yaml
index 48f512fcba..ff349026ea 100644
--- a/heron/config/src/yaml/conf/standalone/scheduler.yaml
+++ b/heron/config/src/yaml/conf/standalone/scheduler.yaml
@@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000
 
 # standalone mode uses the raw_exec driver
 heron.nomad.driver: "raw_exec"
+
+# whether to register metrics service endpoints for prometheus metrics sink in 
consul
+# the service will be named in the format: 
metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: False
+
+# interval at which health checks should be conducted for metrics service 
endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service in a comma delimited list
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
diff --git 
a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml 
b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
index 443bb5b9ae..0a1becb229 100644
--- a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
+++ b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
@@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000
 
 # standalone mode uses the raw_exec driver
 heron.nomad.driver: "raw_exec"
+
+# whether to register metrics service endpoints for prometheus metrics sink in 
consul
+# the service will be named in the format: 
metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: False
+
+# interval at which health checks should be conducted for metrics service 
endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
diff --git 
a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
 
b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
index 13449c6f39..fd64286f9f 100644
--- 
a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
+++ 
b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
@@ -82,7 +82,7 @@
   @Override
   public final void init(Map<String, Object> conf, SinkContext context) {
     String path = (String) conf.get(KEY_PATH);
-    String portFile = (String) conf.get(KEY_PORT_FILE);
+    String portFile = getServerPortFile(conf);
 
     cacheMaxSize = 
TypeUtils.getLong(conf.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE,
         DEFAULT_MAX_CACHE_SIZE));
@@ -132,6 +132,7 @@ protected void startHttpServer(String path, int port) {
         os.close();
         LOG.log(Level.INFO, "Received metrics request.");
       });
+      LOG.info("Starting web sink server on port: " + port);
       httpServer.start();
     } catch (IOException e) {
       throw new RuntimeException("Failed to create Http server on port " + 
port, e);
@@ -164,4 +165,8 @@ public void close() {
       httpServer.stop(0);
     }
   }
+
+  public static String getServerPortFile(Map<String, Object> conf) {
+    return (String) conf.get(KEY_PORT_FILE);
+  }
 }
diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD
index 24bdb3f7b4..4a83f2f50f 100644
--- a/heron/schedulers/src/java/BUILD
+++ b/heron/schedulers/src/java/BUILD
@@ -69,6 +69,7 @@ nomad_sdk_deps = [
 nomad_deps_files = \
     scheduler_deps_files + nomad_sdk_deps + [
         ":scheduler-utils-java",
+        "//heron/metricsmgr/src/java:metricsmgr-java"
     ]
 
 java_library(
diff --git 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
index b82cf82c4f..bfc4fe046f 100644
--- 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
+++ 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
@@ -43,6 +43,7 @@ public String getName() {
 
   public static final String JOB_LINK = "/ui/jobs";
   public static final String HOST = "HOST";
+  public static final String NETWORK_MODE = "network_mode";
 
   public static final String NOMAD_TASK_COMMAND = "command";
   public static final String NOMAD_TASK_COMMAND_ARGS = "args";
@@ -51,6 +52,7 @@ public String getName() {
   public static final String NOMAD_DEFAULT_DATACENTER = "dc1";
   public static final String SHELL_CMD = "/bin/sh";
   public static final String NOMAD_HERON_SCRIPT_NAME = "run_heron_executor.sh";
+  public static final String NOMAD_SERVICE_CHECK_TYPE = "tcp";
 
   public static final String HERON_NOMAD_WORKING_DIR = 
"HERON_NOMAD_WORKING_DIR";
   public static final String HERON_USE_CORE_PACKAGE_URI = 
"HERON_USE_CORE_PACKAGE_URI";
@@ -88,6 +90,9 @@ public String getName() {
   // port number the start with when more than one port needed for remote 
debugging
   public static final String JVM_REMOTE_DEBUGGER_PORT = 
String.format("${NOMAD_PORT_%s}",
       SchedulerUtils.ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS.getName());
+  // port for metrics webserver (AbstractWebSink)
+  public static final String METRICS_PORT = "metrics_port";
+  public static final String METRICS_PORT_FILE = "METRICS_PORT_FILE";
   public static final Map<SchedulerUtils.ExecutorPort, String> EXECUTOR_PORTS 
= new HashMap<>();
 
   static {
diff --git 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
index c66555971d..2dd7f5f3e6 100644
--- 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
+++ 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
@@ -30,6 +30,20 @@
 
   public static final String HERON_EXECUTOR_DOCKER_IMAGE = 
"heron.executor.docker.image";
 
+  public static final String HERON_NOMAD_NETWORK_MODE = 
"heron.nomad.network.mode";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_REGISTER
+      = "heron.nomad.metrics.service.register";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC
+      = "heron.nomad.metrics.service.check.interval.sec";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC
+      = "heron.nomad.metrics.service.check.timeout.sec";
+
+  public static final String HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS
+      = "heron.nomad.metrics.service.additional.tags";
+
   public static String workingDirectory(Config config) {
     return config.getStringValue(
         NomadKey.WORKING_DIRECTORY.value(), 
NomadKey.WORKING_DIRECTORY.getDefaultString());
@@ -42,7 +56,7 @@ public static String getHeronNomadPath(Config config) {
   }
 
   public static String getSchedulerURI(Config config) {
-    return config.getStringValue(HERON_NOMAD_SCHEDULER_URI);
+    return config.getStringValue(HERON_NOMAD_SCHEDULER_URI, 
"http://127.0.0.1:4646";);
   }
 
   public static int getCoreFreqMapping(Config config) {
@@ -50,10 +64,30 @@ public static int getCoreFreqMapping(Config config) {
   }
 
   public static String getHeronNomadDriver(Config config) {
-    return config.getStringValue(HERON_NOMAD_DRIVER);
+    return config.getStringValue(HERON_NOMAD_DRIVER, "docker");
   }
 
   public static String getHeronExecutorDockerImage(Config config) {
-    return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE);
+    return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE, 
"heron/heron:latest");
+  }
+
+  public static boolean getHeronNomadMetricsServiceRegister(Config config) {
+    return config.getBooleanValue(HERON_NOMAD_METRICS_SERVICE_REGISTER, false);
+  }
+
+  public static int getHeronNomadMetricsServiceCheckIntervalSec(Config config) 
{
+    return 
config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, 10);
+  }
+
+  public static int getHeronNomadMetricsServiceCheckTimeoutSec(Config config) {
+    return 
config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, 2);
+  }
+
+  public static String[] getHeronNomadMetricsServiceAdditionalTags(Config 
config) {
+    return config.getStringValue(HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, 
"").split(",");
+  }
+
+  public static String getHeronNomadNetworkMode(Config config) {
+    return config.getStringValue(HERON_NOMAD_NETWORK_MODE, "default");
   }
 }
diff --git 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
index 7531f2364d..2b765050ea 100644
--- 
a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
+++ 
b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
@@ -13,6 +13,7 @@
 //  limitations under the License.
 package com.twitter.heron.scheduler.nomad;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -23,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
@@ -33,6 +35,8 @@
 import com.hashicorp.nomad.apimodel.NetworkResource;
 import com.hashicorp.nomad.apimodel.Port;
 import com.hashicorp.nomad.apimodel.Resources;
+import com.hashicorp.nomad.apimodel.Service;
+import com.hashicorp.nomad.apimodel.ServiceCheck;
 import com.hashicorp.nomad.apimodel.Task;
 import com.hashicorp.nomad.apimodel.TaskGroup;
 import com.hashicorp.nomad.apimodel.Template;
@@ -42,6 +46,8 @@
 import com.hashicorp.nomad.javasdk.NomadException;
 import com.hashicorp.nomad.javasdk.ServerQueryResponse;
 
+import com.twitter.heron.metricsmgr.MetricsSinksConfig;
+import com.twitter.heron.metricsmgr.sink.PrometheusSink;
 import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.scheduler.UpdateTopologyManager;
 import com.twitter.heron.scheduler.utils.Runtime;
@@ -53,6 +59,8 @@
 import com.twitter.heron.spi.packing.Resource;
 import com.twitter.heron.spi.scheduler.IScheduler;
 
+import static com.twitter.heron.scheduler.nomad.NomadConstants.METRICS_PORT;
+
 @SuppressWarnings("IllegalCatch")
 public class NomadScheduler implements IScheduler {
 
@@ -256,7 +264,8 @@ Task getTask(String taskName, int containerIndex, Resource 
containerResource) {
       i++;
     }
 
-    resourceReqs.addNetworks(new NetworkResource().addDynamicPorts(ports));
+    NetworkResource networkResource = new NetworkResource();
+    networkResource.addDynamicPorts(ports);
 
     // set memory requirements
     long memoryReqMb = containerResource.getRam().asMegabytes();
@@ -270,8 +279,44 @@ Task getTask(String taskName, int containerIndex, Resource 
containerResource) {
     // set disk requirements
     long diskReqMb = containerResource.getDisk().asMegabytes();
     resourceReqs.setDiskMb(longToInt(diskReqMb));
-    task.setResources(resourceReqs);
 
+    // allocate dynamic port for prometheus/websink metrics
+    String prometheusPortFile = getPrometheusMetricsFile(this.localConfig);
+    if (prometheusPortFile == null) {
+      LOG.severe("Failed to find port file for Prometheus metrics. "
+          + "Please check metrics sinks configurations");
+    } else {
+      networkResource.addDynamicPorts(new Port().setLabel(METRICS_PORT));
+      task.addEnv(NomadConstants.METRICS_PORT_FILE, prometheusPortFile);
+
+      if (NomadContext.getHeronNomadMetricsServiceRegister(this.localConfig)) {
+        // getting tags for service
+        List<String> tags = new LinkedList<>();
+        tags.add(String.format("%s-%s",
+            Runtime.topologyName(this.runtimeConfig), containerIndex));
+        tags.addAll(Arrays.asList(
+            
NomadContext.getHeronNomadMetricsServiceAdditionalTags(this.localConfig)));
+        //register metrics service with consul
+        Service service = new Service()
+            .setName(
+                
getMetricsServiceName(Runtime.topologyName(this.runtimeConfig), containerIndex))
+            .setPortLabel(METRICS_PORT)
+            .setTags(tags)
+            .addChecks(new 
ServiceCheck().setType(NomadConstants.NOMAD_SERVICE_CHECK_TYPE)
+                .setPortLabel(METRICS_PORT)
+                .setInterval(TimeUnit.NANOSECONDS.convert(
+                    
NomadContext.getHeronNomadMetricsServiceCheckIntervalSec(this.localConfig),
+                    TimeUnit.SECONDS))
+                .setTimeout(TimeUnit.NANOSECONDS.convert(
+                    
NomadContext.getHeronNomadMetricsServiceCheckTimeoutSec(this.localConfig),
+                    TimeUnit.SECONDS)));
+
+        task.addServices(service);
+      }
+    }
+
+    resourceReqs.addNetworks(networkResource);
+    task.setResources(resourceReqs);
     return task;
   }
 
@@ -301,7 +346,13 @@ Task getTaskSpecDockerDriver(Task task, String taskName, 
int containerIndex) {
         NomadContext.getHeronExecutorDockerImage(this.localConfig));
 
     task.addConfig(NomadConstants.NOMAD_TASK_COMMAND, 
NomadConstants.SHELL_CMD);
-    String[] args = {"-c", String.format("%s && %s", topologyDownloadCmd, 
executorCmd)};
+    task.addConfig(NomadConstants.NETWORK_MODE,
+        NomadContext.getHeronNomadNetworkMode(this.localConfig));
+
+    String setMetricsPortFileCmd = getSetMetricsPortFileCmd();
+
+    String[] args = {"-c", String.format("%s && %s && %s",
+        topologyDownloadCmd, setMetricsPortFileCmd, executorCmd)};
 
     task.addConfig(NomadConstants.NOMAD_TASK_COMMAND_ARGS, args);
 
@@ -341,7 +392,6 @@ Task getTaskSpecRawDriver(Task task, String taskName, int 
containerIndex) {
     template.setDestPath(NomadConstants.NOMAD_HERON_SCRIPT_NAME);
     task.addTemplates(template);
 
-    Resources resourceReqs = new Resources();
     // configure nomad to allocate dynamic ports
     Port[] ports = new Port[NomadConstants.EXECUTOR_PORTS.size()];
     int i = 0;
@@ -517,4 +567,41 @@ PackingPlan getHomogeneousPackingPlan(PackingPlan 
packingPlan) {
   Resource getHomogeneousContainerResource(PackingPlan homogeneousPackingPlan) 
{
     return 
homogeneousPackingPlan.getContainers().iterator().next().getRequiredResource();
   }
+
+  static String getPrometheusMetricsFile(Config config) {
+    MetricsSinksConfig metricsSinksConfig;
+    try {
+      metricsSinksConfig = new 
MetricsSinksConfig(Context.metricsSinksFile(config));
+    } catch (FileNotFoundException e) {
+      return null;
+    }
+
+    String prometheusSinkId = null;
+    Map<String, Object> prometheusSinkConfig = null;
+    for (String sinkId : metricsSinksConfig.getSinkIds()) {
+      Map<String, Object> sinkConfig = 
metricsSinksConfig.getConfigForSink(sinkId);
+      Object className = 
sinkConfig.get(MetricsSinksConfig.CONFIG_KEY_CLASSNAME);
+      if (className != null && className instanceof String) {
+        if (PrometheusSink.class.getName().equals(className)) {
+          prometheusSinkId = sinkId;
+          prometheusSinkConfig = sinkConfig;
+        }
+      }
+    }
+    if (prometheusSinkId == null || prometheusSinkConfig == null) {
+      return null;
+    }
+
+    String prometheusMetricsPortFile = 
PrometheusSink.getServerPortFile(prometheusSinkConfig);
+    return prometheusMetricsPortFile;
+  }
+
+  static String getMetricsServiceName(String topologyName, int containerIndex) 
{
+    return String.format("metrics-heron-%s-%s", topologyName, containerIndex);
+  }
+
+  static String getSetMetricsPortFileCmd() {
+    return String.format("echo ${NOMAD_PORT_%s} > ${%s}",
+        NomadConstants.METRICS_PORT, NomadConstants.METRICS_PORT_FILE);
+  }
 }
diff --git 
a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
 
b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
index d6bba66162..c14d65d3a6 100644
--- 
a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
+++ 
b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
@@ -18,10 +18,12 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import com.google.common.base.Optional;
 import com.hashicorp.nomad.apimodel.Job;
+import com.hashicorp.nomad.apimodel.Port;
 import com.hashicorp.nomad.apimodel.Task;
 import com.hashicorp.nomad.apimodel.TaskGroup;
 import com.hashicorp.nomad.javasdk.NomadApiClient;
@@ -73,6 +75,7 @@
   private static final String CORE_PACKAGE_URI = "core-package-uri";
   private static final Boolean USE_CORE_PACKAGE_URI = true;
   private static final String EXECUTOR_BINARY = "executor-binary";
+  private static final String PORT_FILE = "port-file";
 
   private static NomadScheduler scheduler;
 
@@ -91,6 +94,7 @@ public void setUp() throws Exception {
         .put(Key.USE_CORE_PACKAGE_URI, USE_CORE_PACKAGE_URI)
         .put(Key.EXECUTOR_BINARY, EXECUTOR_BINARY)
         .put(NomadContext.HERON_NOMAD_DRIVER, 
NomadConstants.NomadDriver.RAW_EXEC.getName())
+        .put(NomadContext.HERON_NOMAD_NETWORK_MODE, "default")
         .build();
 
     this.mockRuntime = config;
@@ -322,6 +326,8 @@ public void testGetTaskRawExec() {
         .thenReturn((int) MEMORY_RESOURCE.asMegabytes());
     PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
         .thenReturn((int) DISK_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+        .thenReturn(PORT_FILE);
 
     scheduler.initialize(this.mockConfig, this.mockRuntime);
 
@@ -352,6 +358,7 @@ public void testGetTaskRawExec() {
     
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_CORE_PACKAGE_URI));
     
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
     
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_EXECUTOR_CMD));
+    
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
 
     Assert.assertEquals(NomadKey.WORKING_DIRECTORY.getDefaultString() + 
"/container-"
             + String.valueOf(CONTAINER_INDEX),
@@ -364,17 +371,19 @@ public void testGetTaskRawExec() {
         task.getEnv().get(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
     Assert.assertEquals("./heron-core/bin/heron-executor args1 args2",
         task.getEnv().get(NomadConstants.HERON_EXECUTOR_CMD));
+    Assert.assertEquals(PORT_FILE,
+        task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
   }
 
   @SuppressWarnings("unchecked")
   @Test
   public void testGetTaskDocker() {
 
-    this.mockRuntime = this.mockRuntime.newBuilder()
+    this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockRuntime)
         .put(NomadContext.HERON_NOMAD_DRIVER, 
NomadConstants.NomadDriver.DOCKER.getName())
         .build();
 
-    this.mockConfig = this.mockConfig.newBuilder()
+    this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
         .put(NomadContext.HERON_NOMAD_DRIVER, 
NomadConstants.NomadDriver.DOCKER.getName())
         .build();
 
@@ -398,6 +407,8 @@ public void testGetTaskDocker() {
         .thenReturn((int) MEMORY_RESOURCE.asMegabytes());
     PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
         .thenReturn((int) DISK_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+        .thenReturn(PORT_FILE);
 
     scheduler.initialize(this.mockConfig, this.mockRuntime);
 
@@ -409,6 +420,8 @@ public void testGetTaskDocker() {
     
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NOMAD_TASK_COMMAND));
     Assert.assertEquals(NomadConstants.SHELL_CMD,
         task.getConfig().get(NomadConstants.NOMAD_TASK_COMMAND));
+    
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
+
 
     Assert.assertEquals((int) CPU_RESOURCE * HERON_NOMAD_CORE_FREQ_MAPPING,
         task.getResources().getCpu().intValue());
@@ -417,8 +430,102 @@ public void testGetTaskDocker() {
     Assert.assertEquals((int) DISK_RESOURCE.asMegabytes(),
         task.getResources().getDiskMb().intValue());
     Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HOST));
-
+    Assert.assertEquals(PORT_FILE,
+        task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
     Assert.assertEquals("${attr.unique.network.ip-address}",
         task.getEnv().get(NomadConstants.HOST));
+    
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NETWORK_MODE));
+    Assert.assertEquals("default",
+        task.getConfig().get(NomadConstants.NETWORK_MODE));
+
+    Assert.assertEquals(task.getResources().getNetworks().size(), 1);
+
+    Set<String> ports = new HashSet<>();
+    for (Port entry : 
task.getResources().getNetworks().get(0).getDynamicPorts()) {
+      ports.add(entry.getLabel());
+      Assert.assertEquals(entry.getValue(), 0);
+    }
+
+    for (SchedulerUtils.ExecutorPort entry : 
NomadConstants.EXECUTOR_PORTS.keySet()) {
+      Assert.assertTrue(ports.contains(entry.getName().replace("-", "_")));
+    }
+    Assert.assertTrue(ports.contains(NomadConstants.METRICS_PORT));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testServiceCheck() {
+    this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, 
"foo,bar")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
+
+    this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, 
"foo,bar")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
+
+    Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
+    containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
+
+    PowerMockito.mockStatic(SchedulerUtils.class);
+
+    Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, 
DISK_RESOURCE);
+
+    PowerMockito.when(SchedulerUtils.executorCommandArgs(
+        Mockito.any(), Mockito.any(), Mockito.anyMap(), Mockito.anyString()))
+        .thenReturn(EXECUTOR_CMD_ARGS);
+
+    PowerMockito.mockStatic(NomadScheduler.class);
+    PowerMockito.when(NomadScheduler.getFetchCommand(Mockito.any(), 
Mockito.any()))
+        .thenReturn(TOPOLOGY_DOWNLOAD_CMD);
+    PowerMockito.when(NomadScheduler.getHeronNomadScript(this.mockConfig))
+        .thenReturn(HERON_NOMAD_SCRIPT);
+    PowerMockito.when(NomadScheduler.longToInt(MEMORY_RESOURCE.asMegabytes()))
+        .thenReturn((int) MEMORY_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
+        .thenReturn((int) DISK_RESOURCE.asMegabytes());
+    PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+        .thenReturn(PORT_FILE);
+    PowerMockito.when(NomadScheduler.getMetricsServiceName(Mockito.any(), 
Mockito.anyInt()))
+        .thenReturn(String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, 
CONTAINER_INDEX));
+
+    scheduler.initialize(this.mockConfig, this.mockRuntime);
+
+    Task task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
+    LOG.info("task: " + task);
+
+    Assert.assertEquals(task.getServices().size(), 1);
+    Assert.assertEquals(task.getServices().get(0).getName(),
+        String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX));
+
+    String[] tags = {String.format("%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX), 
"foo", "bar"};
+    Assert.assertEquals(task.getServices().get(0).getTags(), 
Arrays.asList(tags));
+    Assert.assertEquals(task.getServices().get(0).getPortLabel(), 
NomadConstants.METRICS_PORT);
+    Assert.assertEquals(task.getServices().get(0).getChecks().size(), 1);
+    
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getPortLabel(),
+        NomadConstants.METRICS_PORT);
+    Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getType(),
+        NomadConstants.NOMAD_SERVICE_CHECK_TYPE);
+    TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
+    
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getInterval(),
+        TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS));
+    
Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getTimeout(),
+        TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS));
+
+
+    // if service registration is turned off
+    this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
+
+    this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
+        .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
+    scheduler.initialize(this.mockConfig, this.mockRuntime);
+
+    task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
+    LOG.info("task: " + task);
+    Assert.assertTrue(task.getServices() == null);
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to