This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 2a4c1c1 expose heron metrics as a service in Nomad (#2859) 2a4c1c1 is described below commit 2a4c1c1613bc2d10851503fcc35c571618a28ec1 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sat Apr 7 08:58:54 2018 -0700 expose heron metrics as a service in Nomad (#2859) --- heron/config/src/yaml/conf/nomad/heron_nomad.sh | 3 + .../config/src/yaml/conf/nomad/metrics_sinks.yaml | 3 +- heron/config/src/yaml/conf/nomad/scheduler.yaml | 17 ++++ .../config/src/yaml/conf/standalone/heron_nomad.sh | 3 + .../src/yaml/conf/standalone/metrics_sinks.yaml | 3 +- .../config/src/yaml/conf/standalone/scheduler.yaml | 14 +++ .../standalone/templates/scheduler.template.yaml | 14 +++ .../heron/metricsmgr/sink/AbstractWebSink.java | 7 +- heron/schedulers/src/java/BUILD | 1 + .../heron/scheduler/nomad/NomadConstants.java | 5 + .../heron/scheduler/nomad/NomadContext.java | 40 +++++++- .../heron/scheduler/nomad/NomadScheduler.java | 95 ++++++++++++++++- .../heron/scheduler/nomad/NomadSchedulerTest.java | 113 ++++++++++++++++++++- 13 files changed, 305 insertions(+), 13 deletions(-) diff --git a/heron/config/src/yaml/conf/nomad/heron_nomad.sh b/heron/config/src/yaml/conf/nomad/heron_nomad.sh index 51ca593..826ba0d 100644 --- a/heron/config/src/yaml/conf/nomad/heron_nomad.sh +++ b/heron/config/src/yaml/conf/nomad/heron_nomad.sh @@ -25,6 +25,9 @@ fi # download and extract heron topology package ${HERON_TOPOLOGY_DOWNLOAD_CMD} +# set metrics port file +echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE} + # launch heron executor trap 'kill -TERM $PID' TERM INT ${HERON_EXECUTOR_CMD} & diff --git a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml index b5257e2..a70f43a 100644 --- a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml +++ b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml @@ -60,7 +60,8 @@ tmaster-sink: prometheus-sink: class: "com.twitter.heron.metricsmgr.sink.PrometheusSink" - port: 8080 # The port on which to run (either port or port-file are mandatory) +# port: 8080 # The port on which to run (either port or port-file are mandatory) + port-file: "port_file" path: /metrics # The path on which to publish the metrics (mandatory) flat-metrics: true # By default the web-sink will publish a flat "name -> value" json map include-topology-name: true # Include topology name in metric name (default false) diff --git a/heron/config/src/yaml/conf/nomad/scheduler.yaml b/heron/config/src/yaml/conf/nomad/scheduler.yaml index 297afc4..21d4268 100644 --- a/heron/config/src/yaml/conf/nomad/scheduler.yaml +++ b/heron/config/src/yaml/conf/nomad/scheduler.yaml @@ -26,3 +26,20 @@ heron.nomad.driver: "docker" # The docker image to use for heron if the docker driver is used, heron.executor.docker.image: 'heron/heron:latest' + +# Set networking mode networking when driver is docker +heron.nomad.network.mode: "default" + +# whether to register metrics service endpoints for prometheus metrics sink in consul +# the service will be named in the format: metrics-heron-<topology-name>-<container-index> +heron.nomad.metrics.service.register: True + +# interval at which health checks should be conducted for metrics service endpoint +heron.nomad.metrics.service.check.interval.sec: 10 + +# timeout of metrics service endpoint health check +heron.nomad.metrics.service.check.timeout.sec: 2 + +# additional tags to be attached to metrics service +# A tag of <topology-name>-<container-index> with be automaticallu attached +heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron" \ No newline at end of file diff --git a/heron/config/src/yaml/conf/standalone/heron_nomad.sh b/heron/config/src/yaml/conf/standalone/heron_nomad.sh index 51ca593..826ba0d 100644 --- a/heron/config/src/yaml/conf/standalone/heron_nomad.sh +++ b/heron/config/src/yaml/conf/standalone/heron_nomad.sh @@ -25,6 +25,9 @@ fi # download and extract heron topology package ${HERON_TOPOLOGY_DOWNLOAD_CMD} +# set metrics port file +echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE} + # launch heron executor trap 'kill -TERM $PID' TERM INT ${HERON_EXECUTOR_CMD} & diff --git a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml index b5257e2..a70f43a 100644 --- a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml +++ b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml @@ -60,7 +60,8 @@ tmaster-sink: prometheus-sink: class: "com.twitter.heron.metricsmgr.sink.PrometheusSink" - port: 8080 # The port on which to run (either port or port-file are mandatory) +# port: 8080 # The port on which to run (either port or port-file are mandatory) + port-file: "port_file" path: /metrics # The path on which to publish the metrics (mandatory) flat-metrics: true # By default the web-sink will publish a flat "name -> value" json map include-topology-name: true # Include topology name in metric name (default false) diff --git a/heron/config/src/yaml/conf/standalone/scheduler.yaml b/heron/config/src/yaml/conf/standalone/scheduler.yaml index 48f512f..ff34902 100644 --- a/heron/config/src/yaml/conf/standalone/scheduler.yaml +++ b/heron/config/src/yaml/conf/standalone/scheduler.yaml @@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000 # standalone mode uses the raw_exec driver heron.nomad.driver: "raw_exec" + +# whether to register metrics service endpoints for prometheus metrics sink in consul +# the service will be named in the format: metrics-heron-<topology-name>-<container-index> +heron.nomad.metrics.service.register: False + +# interval at which health checks should be conducted for metrics service endpoint +heron.nomad.metrics.service.check.interval.sec: 10 + +# timeout of metrics service endpoint health check +heron.nomad.metrics.service.check.timeout.sec: 2 + +# additional tags to be attached to metrics service in a comma delimited list +# A tag of <topology-name>-<container-index> with be automaticallu attached +heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron" diff --git a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml index 443bb5b..0a1becb 100644 --- a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml +++ b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml @@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000 # standalone mode uses the raw_exec driver heron.nomad.driver: "raw_exec" + +# whether to register metrics service endpoints for prometheus metrics sink in consul +# the service will be named in the format: metrics-heron-<topology-name>-<container-index> +heron.nomad.metrics.service.register: False + +# interval at which health checks should be conducted for metrics service endpoint +heron.nomad.metrics.service.check.interval.sec: 10 + +# timeout of metrics service endpoint health check +heron.nomad.metrics.service.check.timeout.sec: 2 + +# additional tags to be attached to metrics service +# A tag of <topology-name>-<container-index> with be automaticallu attached +heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron" diff --git a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java index 13449c6..fd64286 100644 --- a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java +++ b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java @@ -82,7 +82,7 @@ abstract class AbstractWebSink implements IMetricsSink { @Override public final void init(Map<String, Object> conf, SinkContext context) { String path = (String) conf.get(KEY_PATH); - String portFile = (String) conf.get(KEY_PORT_FILE); + String portFile = getServerPortFile(conf); cacheMaxSize = TypeUtils.getLong(conf.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE, DEFAULT_MAX_CACHE_SIZE)); @@ -132,6 +132,7 @@ abstract class AbstractWebSink implements IMetricsSink { os.close(); LOG.log(Level.INFO, "Received metrics request."); }); + LOG.info("Starting web sink server on port: " + port); httpServer.start(); } catch (IOException e) { throw new RuntimeException("Failed to create Http server on port " + port, e); @@ -164,4 +165,8 @@ abstract class AbstractWebSink implements IMetricsSink { httpServer.stop(0); } } + + public static String getServerPortFile(Map<String, Object> conf) { + return (String) conf.get(KEY_PORT_FILE); + } } diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD index 24bdb3f..4a83f2f 100644 --- a/heron/schedulers/src/java/BUILD +++ b/heron/schedulers/src/java/BUILD @@ -69,6 +69,7 @@ nomad_sdk_deps = [ nomad_deps_files = \ scheduler_deps_files + nomad_sdk_deps + [ ":scheduler-utils-java", + "//heron/metricsmgr/src/java:metricsmgr-java" ] java_library( diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java index b82cf82..bfc4fe0 100644 --- a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java @@ -43,6 +43,7 @@ public final class NomadConstants { public static final String JOB_LINK = "/ui/jobs"; public static final String HOST = "HOST"; + public static final String NETWORK_MODE = "network_mode"; public static final String NOMAD_TASK_COMMAND = "command"; public static final String NOMAD_TASK_COMMAND_ARGS = "args"; @@ -51,6 +52,7 @@ public final class NomadConstants { public static final String NOMAD_DEFAULT_DATACENTER = "dc1"; public static final String SHELL_CMD = "/bin/sh"; public static final String NOMAD_HERON_SCRIPT_NAME = "run_heron_executor.sh"; + public static final String NOMAD_SERVICE_CHECK_TYPE = "tcp"; public static final String HERON_NOMAD_WORKING_DIR = "HERON_NOMAD_WORKING_DIR"; public static final String HERON_USE_CORE_PACKAGE_URI = "HERON_USE_CORE_PACKAGE_URI"; @@ -88,6 +90,9 @@ public final class NomadConstants { // port number the start with when more than one port needed for remote debugging public static final String JVM_REMOTE_DEBUGGER_PORT = String.format("${NOMAD_PORT_%s}", SchedulerUtils.ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS.getName()); + // port for metrics webserver (AbstractWebSink) + public static final String METRICS_PORT = "metrics_port"; + public static final String METRICS_PORT_FILE = "METRICS_PORT_FILE"; public static final Map<SchedulerUtils.ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>(); static { diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java index c665559..2dd7f5f 100644 --- a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java @@ -30,6 +30,20 @@ public class NomadContext extends Context { public static final String HERON_EXECUTOR_DOCKER_IMAGE = "heron.executor.docker.image"; + public static final String HERON_NOMAD_NETWORK_MODE = "heron.nomad.network.mode"; + + public static final String HERON_NOMAD_METRICS_SERVICE_REGISTER + = "heron.nomad.metrics.service.register"; + + public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC + = "heron.nomad.metrics.service.check.interval.sec"; + + public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC + = "heron.nomad.metrics.service.check.timeout.sec"; + + public static final String HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS + = "heron.nomad.metrics.service.additional.tags"; + public static String workingDirectory(Config config) { return config.getStringValue( NomadKey.WORKING_DIRECTORY.value(), NomadKey.WORKING_DIRECTORY.getDefaultString()); @@ -42,7 +56,7 @@ public class NomadContext extends Context { } public static String getSchedulerURI(Config config) { - return config.getStringValue(HERON_NOMAD_SCHEDULER_URI); + return config.getStringValue(HERON_NOMAD_SCHEDULER_URI, "http://127.0.0.1:4646"); } public static int getCoreFreqMapping(Config config) { @@ -50,10 +64,30 @@ public class NomadContext extends Context { } public static String getHeronNomadDriver(Config config) { - return config.getStringValue(HERON_NOMAD_DRIVER); + return config.getStringValue(HERON_NOMAD_DRIVER, "docker"); } public static String getHeronExecutorDockerImage(Config config) { - return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE); + return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE, "heron/heron:latest"); + } + + public static boolean getHeronNomadMetricsServiceRegister(Config config) { + return config.getBooleanValue(HERON_NOMAD_METRICS_SERVICE_REGISTER, false); + } + + public static int getHeronNomadMetricsServiceCheckIntervalSec(Config config) { + return config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, 10); + } + + public static int getHeronNomadMetricsServiceCheckTimeoutSec(Config config) { + return config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, 2); + } + + public static String[] getHeronNomadMetricsServiceAdditionalTags(Config config) { + return config.getStringValue(HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "").split(","); + } + + public static String getHeronNomadNetworkMode(Config config) { + return config.getStringValue(HERON_NOMAD_NETWORK_MODE, "default"); } } diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java index 7531f23..2b76505 100644 --- a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java @@ -13,6 +13,7 @@ // limitations under the License. package com.twitter.heron.scheduler.nomad; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -23,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -33,6 +35,8 @@ import com.hashicorp.nomad.apimodel.JobListStub; import com.hashicorp.nomad.apimodel.NetworkResource; import com.hashicorp.nomad.apimodel.Port; import com.hashicorp.nomad.apimodel.Resources; +import com.hashicorp.nomad.apimodel.Service; +import com.hashicorp.nomad.apimodel.ServiceCheck; import com.hashicorp.nomad.apimodel.Task; import com.hashicorp.nomad.apimodel.TaskGroup; import com.hashicorp.nomad.apimodel.Template; @@ -42,6 +46,8 @@ import com.hashicorp.nomad.javasdk.NomadApiConfiguration; import com.hashicorp.nomad.javasdk.NomadException; import com.hashicorp.nomad.javasdk.ServerQueryResponse; +import com.twitter.heron.metricsmgr.MetricsSinksConfig; +import com.twitter.heron.metricsmgr.sink.PrometheusSink; import com.twitter.heron.proto.scheduler.Scheduler; import com.twitter.heron.scheduler.UpdateTopologyManager; import com.twitter.heron.scheduler.utils.Runtime; @@ -53,6 +59,8 @@ import com.twitter.heron.spi.packing.PackingPlan; import com.twitter.heron.spi.packing.Resource; import com.twitter.heron.spi.scheduler.IScheduler; +import static com.twitter.heron.scheduler.nomad.NomadConstants.METRICS_PORT; + @SuppressWarnings("IllegalCatch") public class NomadScheduler implements IScheduler { @@ -256,7 +264,8 @@ public class NomadScheduler implements IScheduler { i++; } - resourceReqs.addNetworks(new NetworkResource().addDynamicPorts(ports)); + NetworkResource networkResource = new NetworkResource(); + networkResource.addDynamicPorts(ports); // set memory requirements long memoryReqMb = containerResource.getRam().asMegabytes(); @@ -270,8 +279,44 @@ public class NomadScheduler implements IScheduler { // set disk requirements long diskReqMb = containerResource.getDisk().asMegabytes(); resourceReqs.setDiskMb(longToInt(diskReqMb)); - task.setResources(resourceReqs); + // allocate dynamic port for prometheus/websink metrics + String prometheusPortFile = getPrometheusMetricsFile(this.localConfig); + if (prometheusPortFile == null) { + LOG.severe("Failed to find port file for Prometheus metrics. " + + "Please check metrics sinks configurations"); + } else { + networkResource.addDynamicPorts(new Port().setLabel(METRICS_PORT)); + task.addEnv(NomadConstants.METRICS_PORT_FILE, prometheusPortFile); + + if (NomadContext.getHeronNomadMetricsServiceRegister(this.localConfig)) { + // getting tags for service + List<String> tags = new LinkedList<>(); + tags.add(String.format("%s-%s", + Runtime.topologyName(this.runtimeConfig), containerIndex)); + tags.addAll(Arrays.asList( + NomadContext.getHeronNomadMetricsServiceAdditionalTags(this.localConfig))); + //register metrics service with consul + Service service = new Service() + .setName( + getMetricsServiceName(Runtime.topologyName(this.runtimeConfig), containerIndex)) + .setPortLabel(METRICS_PORT) + .setTags(tags) + .addChecks(new ServiceCheck().setType(NomadConstants.NOMAD_SERVICE_CHECK_TYPE) + .setPortLabel(METRICS_PORT) + .setInterval(TimeUnit.NANOSECONDS.convert( + NomadContext.getHeronNomadMetricsServiceCheckIntervalSec(this.localConfig), + TimeUnit.SECONDS)) + .setTimeout(TimeUnit.NANOSECONDS.convert( + NomadContext.getHeronNomadMetricsServiceCheckTimeoutSec(this.localConfig), + TimeUnit.SECONDS))); + + task.addServices(service); + } + } + + resourceReqs.addNetworks(networkResource); + task.setResources(resourceReqs); return task; } @@ -301,7 +346,13 @@ public class NomadScheduler implements IScheduler { NomadContext.getHeronExecutorDockerImage(this.localConfig)); task.addConfig(NomadConstants.NOMAD_TASK_COMMAND, NomadConstants.SHELL_CMD); - String[] args = {"-c", String.format("%s && %s", topologyDownloadCmd, executorCmd)}; + task.addConfig(NomadConstants.NETWORK_MODE, + NomadContext.getHeronNomadNetworkMode(this.localConfig)); + + String setMetricsPortFileCmd = getSetMetricsPortFileCmd(); + + String[] args = {"-c", String.format("%s && %s && %s", + topologyDownloadCmd, setMetricsPortFileCmd, executorCmd)}; task.addConfig(NomadConstants.NOMAD_TASK_COMMAND_ARGS, args); @@ -341,7 +392,6 @@ public class NomadScheduler implements IScheduler { template.setDestPath(NomadConstants.NOMAD_HERON_SCRIPT_NAME); task.addTemplates(template); - Resources resourceReqs = new Resources(); // configure nomad to allocate dynamic ports Port[] ports = new Port[NomadConstants.EXECUTOR_PORTS.size()]; int i = 0; @@ -517,4 +567,41 @@ public class NomadScheduler implements IScheduler { Resource getHomogeneousContainerResource(PackingPlan homogeneousPackingPlan) { return homogeneousPackingPlan.getContainers().iterator().next().getRequiredResource(); } + + static String getPrometheusMetricsFile(Config config) { + MetricsSinksConfig metricsSinksConfig; + try { + metricsSinksConfig = new MetricsSinksConfig(Context.metricsSinksFile(config)); + } catch (FileNotFoundException e) { + return null; + } + + String prometheusSinkId = null; + Map<String, Object> prometheusSinkConfig = null; + for (String sinkId : metricsSinksConfig.getSinkIds()) { + Map<String, Object> sinkConfig = metricsSinksConfig.getConfigForSink(sinkId); + Object className = sinkConfig.get(MetricsSinksConfig.CONFIG_KEY_CLASSNAME); + if (className != null && className instanceof String) { + if (PrometheusSink.class.getName().equals(className)) { + prometheusSinkId = sinkId; + prometheusSinkConfig = sinkConfig; + } + } + } + if (prometheusSinkId == null || prometheusSinkConfig == null) { + return null; + } + + String prometheusMetricsPortFile = PrometheusSink.getServerPortFile(prometheusSinkConfig); + return prometheusMetricsPortFile; + } + + static String getMetricsServiceName(String topologyName, int containerIndex) { + return String.format("metrics-heron-%s-%s", topologyName, containerIndex); + } + + static String getSetMetricsPortFileCmd() { + return String.format("echo ${NOMAD_PORT_%s} > ${%s}", + NomadConstants.METRICS_PORT, NomadConstants.METRICS_PORT_FILE); + } } diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java index d6bba66..c14d65d 100644 --- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java +++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java @@ -18,10 +18,12 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import com.google.common.base.Optional; import com.hashicorp.nomad.apimodel.Job; +import com.hashicorp.nomad.apimodel.Port; import com.hashicorp.nomad.apimodel.Task; import com.hashicorp.nomad.apimodel.TaskGroup; import com.hashicorp.nomad.javasdk.NomadApiClient; @@ -73,6 +75,7 @@ public class NomadSchedulerTest { private static final String CORE_PACKAGE_URI = "core-package-uri"; private static final Boolean USE_CORE_PACKAGE_URI = true; private static final String EXECUTOR_BINARY = "executor-binary"; + private static final String PORT_FILE = "port-file"; private static NomadScheduler scheduler; @@ -91,6 +94,7 @@ public class NomadSchedulerTest { .put(Key.USE_CORE_PACKAGE_URI, USE_CORE_PACKAGE_URI) .put(Key.EXECUTOR_BINARY, EXECUTOR_BINARY) .put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.RAW_EXEC.getName()) + .put(NomadContext.HERON_NOMAD_NETWORK_MODE, "default") .build(); this.mockRuntime = config; @@ -322,6 +326,8 @@ public class NomadSchedulerTest { .thenReturn((int) MEMORY_RESOURCE.asMegabytes()); PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes())) .thenReturn((int) DISK_RESOURCE.asMegabytes()); + PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any())) + .thenReturn(PORT_FILE); scheduler.initialize(this.mockConfig, this.mockRuntime); @@ -352,6 +358,7 @@ public class NomadSchedulerTest { Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_CORE_PACKAGE_URI)); Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD)); Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_EXECUTOR_CMD)); + Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE)); Assert.assertEquals(NomadKey.WORKING_DIRECTORY.getDefaultString() + "/container-" + String.valueOf(CONTAINER_INDEX), @@ -364,17 +371,19 @@ public class NomadSchedulerTest { task.getEnv().get(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD)); Assert.assertEquals("./heron-core/bin/heron-executor args1 args2", task.getEnv().get(NomadConstants.HERON_EXECUTOR_CMD)); + Assert.assertEquals(PORT_FILE, + task.getEnv().get(NomadConstants.METRICS_PORT_FILE)); } @SuppressWarnings("unchecked") @Test public void testGetTaskDocker() { - this.mockRuntime = this.mockRuntime.newBuilder() + this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockRuntime) .put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.DOCKER.getName()) .build(); - this.mockConfig = this.mockConfig.newBuilder() + this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig) .put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.DOCKER.getName()) .build(); @@ -398,6 +407,8 @@ public class NomadSchedulerTest { .thenReturn((int) MEMORY_RESOURCE.asMegabytes()); PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes())) .thenReturn((int) DISK_RESOURCE.asMegabytes()); + PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any())) + .thenReturn(PORT_FILE); scheduler.initialize(this.mockConfig, this.mockRuntime); @@ -409,6 +420,8 @@ public class NomadSchedulerTest { Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NOMAD_TASK_COMMAND)); Assert.assertEquals(NomadConstants.SHELL_CMD, task.getConfig().get(NomadConstants.NOMAD_TASK_COMMAND)); + Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE)); + Assert.assertEquals((int) CPU_RESOURCE * HERON_NOMAD_CORE_FREQ_MAPPING, task.getResources().getCpu().intValue()); @@ -417,8 +430,102 @@ public class NomadSchedulerTest { Assert.assertEquals((int) DISK_RESOURCE.asMegabytes(), task.getResources().getDiskMb().intValue()); Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HOST)); - + Assert.assertEquals(PORT_FILE, + task.getEnv().get(NomadConstants.METRICS_PORT_FILE)); Assert.assertEquals("${attr.unique.network.ip-address}", task.getEnv().get(NomadConstants.HOST)); + Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NETWORK_MODE)); + Assert.assertEquals("default", + task.getConfig().get(NomadConstants.NETWORK_MODE)); + + Assert.assertEquals(task.getResources().getNetworks().size(), 1); + + Set<String> ports = new HashSet<>(); + for (Port entry : task.getResources().getNetworks().get(0).getDynamicPorts()) { + ports.add(entry.getLabel()); + Assert.assertEquals(entry.getValue(), 0); + } + + for (SchedulerUtils.ExecutorPort entry : NomadConstants.EXECUTOR_PORTS.keySet()) { + Assert.assertTrue(ports.contains(entry.getName().replace("-", "_"))); + } + Assert.assertTrue(ports.contains(NomadConstants.METRICS_PORT)); + } + + @SuppressWarnings("unchecked") + @Test + public void testServiceCheck() { + this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig) + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "foo,bar") + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2") + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10") + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build(); + + this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig) + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "foo,bar") + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2") + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10") + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build(); + + Set<PackingPlan.ContainerPlan> containers = new HashSet<>(); + containers.add(Mockito.mock(PackingPlan.ContainerPlan.class)); + + PowerMockito.mockStatic(SchedulerUtils.class); + + Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, DISK_RESOURCE); + + PowerMockito.when(SchedulerUtils.executorCommandArgs( + Mockito.any(), Mockito.any(), Mockito.anyMap(), Mockito.anyString())) + .thenReturn(EXECUTOR_CMD_ARGS); + + PowerMockito.mockStatic(NomadScheduler.class); + PowerMockito.when(NomadScheduler.getFetchCommand(Mockito.any(), Mockito.any())) + .thenReturn(TOPOLOGY_DOWNLOAD_CMD); + PowerMockito.when(NomadScheduler.getHeronNomadScript(this.mockConfig)) + .thenReturn(HERON_NOMAD_SCRIPT); + PowerMockito.when(NomadScheduler.longToInt(MEMORY_RESOURCE.asMegabytes())) + .thenReturn((int) MEMORY_RESOURCE.asMegabytes()); + PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes())) + .thenReturn((int) DISK_RESOURCE.asMegabytes()); + PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any())) + .thenReturn(PORT_FILE); + PowerMockito.when(NomadScheduler.getMetricsServiceName(Mockito.any(), Mockito.anyInt())) + .thenReturn(String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX)); + + scheduler.initialize(this.mockConfig, this.mockRuntime); + + Task task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource); + LOG.info("task: " + task); + + Assert.assertEquals(task.getServices().size(), 1); + Assert.assertEquals(task.getServices().get(0).getName(), + String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX)); + + String[] tags = {String.format("%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX), "foo", "bar"}; + Assert.assertEquals(task.getServices().get(0).getTags(), Arrays.asList(tags)); + Assert.assertEquals(task.getServices().get(0).getPortLabel(), NomadConstants.METRICS_PORT); + Assert.assertEquals(task.getServices().get(0).getChecks().size(), 1); + Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getPortLabel(), + NomadConstants.METRICS_PORT); + Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getType(), + NomadConstants.NOMAD_SERVICE_CHECK_TYPE); + TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS); + Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getInterval(), + TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS)); + Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getTimeout(), + TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS)); + + + // if service registration is turned off + this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig) + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build(); + + this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig) + .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build(); + scheduler.initialize(this.mockConfig, this.mockRuntime); + + task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource); + LOG.info("task: " + task); + Assert.assertTrue(task.getServices() == null); } } -- To stop receiving notification emails like this one, please contact karth...@apache.org.