This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 89ccde0 Metrics Collector Agent for Functions running in Kubernetes (#2773) 89ccde0 is described below commit 89ccde01617366ae1c5a6950c883b7aacac195e9 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Oct 11 18:36:25 2018 -0700 Metrics Collector Agent for Functions running in Kubernetes (#2773) ### Motivation Added an agent to scrape for metrics from the functions container and expose it on a port for prometheus to scrape. --- distribution/server/src/assemble/bin.xml | 5 + pulsar-functions/metrics/pom.xml | 38 ++++++ .../functions/metrics/PrometheusMetricsServer.java | 137 +++++++++++++++++++++ .../functions/metrics/sink/AbstractWebSink.java | 6 +- .../functions/runtime/KubernetesRuntime.java | 96 +++++++++++++-- .../runtime/KubernetesRuntimeFactory.java | 11 +- .../functions/runtime/KubernetesRuntimeTest.java | 2 +- .../functions/worker/FunctionRuntimeManager.java | 3 +- .../pulsar/functions/worker/WorkerConfig.java | 1 + 9 files changed, 282 insertions(+), 17 deletions(-) diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml index 533c195..c9ad893 100644 --- a/distribution/server/src/assemble/bin.xml +++ b/distribution/server/src/assemble/bin.xml @@ -81,6 +81,11 @@ <outputDirectory>instances</outputDirectory> </file> <file> + <source>${basedir}/../../pulsar-functions/metrics/target/PrometheusMetricsServer.jar</source> + <destName>PrometheusMetricsServer.jar</destName> + <outputDirectory>instances</outputDirectory> + </file> + <file> <source>${basedir}/../../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar</source> <destName>api-examples.jar</destName> <outputDirectory>examples</outputDirectory> diff --git a/pulsar-functions/metrics/pom.xml b/pulsar-functions/metrics/pom.xml index afa12cc..ca5e0a5 100644 --- a/pulsar-functions/metrics/pom.xml +++ b/pulsar-functions/metrics/pom.xml @@ -45,6 +45,44 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName> + PrometheusMetricsServer + </finalName> + <archive> + <manifest> + <mainClass> + org.apache.pulsar.functions.sink.PrometheusMetricsServer + </mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </project> diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java new file mode 100644 index 0000000..ef9f0ce --- /dev/null +++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.metrics; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.Empty; +import com.google.protobuf.util.JsonFormat; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.metrics.sink.AbstractWebSink; +import org.apache.pulsar.functions.metrics.sink.PrometheusSink; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.proto.InstanceControlGrpc; + +import java.util.HashMap; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.*; + +/** + * A function container implemented using java thread. + */ +@Slf4j +public class PrometheusMetricsServer { + @Parameter(names = "--function_details", description = "Function details json\n", required = true) + protected String functionDetailsJsonString; + + @Parameter(names = "--prometheus_port", description = "Port to listen for prometheus requests\n", required = true) + protected int prometheusPort; + + @Parameter(names = "--grpc_port", description = "GRPC Port to query the metrics from instance\n", required = true) + protected int grpc_port; + + @Parameter(names = "--collection_interval", description = "Number in seconds between collection interval\n", required = true) + protected int metricsCollectionInterval; + + private FunctionDetails functionDetails; + private MetricsSink metricsSink; + private ManagedChannel channel; + private InstanceControlGrpc.InstanceControlFutureStub stub; + private ScheduledExecutorService timer; + + public PrometheusMetricsServer() { } + + + public void start() throws Exception { + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + if (functionDetailsJsonString.charAt(0) == '\'') { + functionDetailsJsonString = functionDetailsJsonString.substring(1); + } + if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') { + functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1); + } + JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder); + functionDetails = functionDetailsBuilder.build(); + + metricsSink = new PrometheusSink(); + Map<String, String> config = new HashMap<>(); + config.put(AbstractWebSink.KEY_PATH, "/metrics"); + config.put(AbstractWebSink.KEY_PORT, String.valueOf(prometheusPort)); + metricsSink.init(config); + + channel = ManagedChannelBuilder.forAddress("127.0.0.1", grpc_port) + .usePlaintext(true) + .build(); + stub = InstanceControlGrpc.newFutureStub(channel); + + if (metricsCollectionInterval > 0) { + timer = Executors.newSingleThreadScheduledExecutor(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + CompletableFuture<InstanceCommunication.MetricsData> result = getMetrics(); + try { + metricsSink.processRecord(result.get(), functionDetails); + } catch (Exception e) { + log.error("Getting metrics data failed {}/{}/{}", + functionDetails.getTenant(), + functionDetails.getNamespace(), + functionDetails.getName(), + e); + } + } + }, metricsCollectionInterval, metricsCollectionInterval, TimeUnit.SECONDS); + } + } + + public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() { + CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>(); + ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(10, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build()); + Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() { + @Override + public void onFailure(Throwable throwable) { + retval.completeExceptionally(throwable); + } + + @Override + public void onSuccess(InstanceCommunication.MetricsData t) { + retval.complete(t); + } + }); + return retval; + } + + public static void main(String[] args) throws Exception { + PrometheusMetricsServer server = new PrometheusMetricsServer(); + JCommander jcommander = new JCommander(server); + jcommander.setProgramName("PrometheusMetricsServer"); + + // parse args by JCommander + jcommander.parse(args); + server.start(); + } +} diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java index 86f17d7..cb5541a 100644 --- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java +++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java @@ -39,16 +39,16 @@ import org.apache.pulsar.functions.metrics.MetricsSink; /** * A metrics sink that publishes metrics on a http endpoint */ -abstract class AbstractWebSink implements MetricsSink { +abstract public class AbstractWebSink implements MetricsSink { private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName()); private static final int HTTP_STATUS_OK = 200; // Metrics will be published on http://host:port/path, the port - private static final String KEY_PORT = "port"; + public static final String KEY_PORT = "port"; // The path - private static final String KEY_PATH = "path"; + public static final String KEY_PATH = "path"; // Maximum number of metrics getting served private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size"; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index a9e8c75..40ef003 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Empty; +import com.google.protobuf.util.JsonFormat; import com.squareup.okhttp.Response; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -34,6 +35,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.metrics.PrometheusMetricsServer; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; @@ -62,6 +64,9 @@ class KubernetesRuntime implements Runtime { private static final String ENV_SHARD_ID = "SHARD_ID"; private static final int maxJobNameSize = 55; private static final Integer GRPC_PORT = 9093; + private static final Integer PROMETHEUS_PORT = 9094; + private static final Double prometheusMetricsServerCpu = 0.1; + private static final Long prometheusMetricsServerRam = 250000000l; public static final Pattern VALID_POD_NAME_REGEX = Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*", Pattern.CASE_INSENSITIVE); @@ -80,6 +85,7 @@ class KubernetesRuntime implements Runtime { // The thread that invokes the function @Getter private List<String> processArgs; + private List<String> prometheusMetricsServerArgs; @Getter private ManagedChannel[] channel; private InstanceControlGrpc.InstanceControlFutureStub[] stub; @@ -103,13 +109,15 @@ class KubernetesRuntime implements Runtime { String pulsarRootDir, InstanceConfig instanceConfig, String instanceFile, + String prometheusMetricsServerJarFile, String logDirectory, String userCodePkgUrl, String originalCodeFileName, String pulsarServiceUrl, String pulsarAdminUrl, String stateStorageServiceUrl, - AuthenticationConfig authConfig) throws Exception { + AuthenticationConfig authConfig, + Integer expectedMetricsInterval) throws Exception { this.appsClient = appsClient; this.coreClient = coreClient; this.instanceConfig = instanceConfig; @@ -122,6 +130,7 @@ class KubernetesRuntime implements Runtime { this.pulsarAdminUrl = pulsarAdminUrl; this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, pulsarRootDir + "/conf/log4j2.yaml", installUserCodeDependencies); + this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval); running = false; doChecks(instanceConfig.getFunctionDetails()); } @@ -379,6 +388,14 @@ class KubernetesRuntime implements Runtime { ); } + protected List<String> getPrometheusMetricsServerCommand() { + return Arrays.asList( + "sh", + "-c", + String.join(" ", prometheusMetricsServerArgs) + ); + } + private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) { return Arrays.asList( pulsarRootDir + "/bin/pulsar-admin", @@ -428,10 +445,7 @@ class KubernetesRuntime implements Runtime { // set up pod meta final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(instanceConfig.getFunctionDetails())); - /* - TODO:- Figure out the metrics collection later. templateMetaData.annotations(getPrometheusAnnotations()); - */ podTemplateSpec.setMetadata(templateMetaData); final List<String> command = getExecutorCommand(); @@ -447,7 +461,7 @@ class KubernetesRuntime implements Runtime { private Map<String, String> getPrometheusAnnotations() { final Map<String, String> annotations = new HashMap<>(); annotations.put("prometheus.io/scrape", "true"); - annotations.put("prometheus.io/port", "8080"); + annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT)); return annotations; } @@ -472,8 +486,10 @@ class KubernetesRuntime implements Runtime { // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions podSpec.setTolerations(getTolerations()); - podSpec.containers(Collections.singletonList( - getContainer(instanceCommand, resource))); + List<V1Container> containers = new LinkedList<>(); + containers.add(getFunctionContainer(instanceCommand, resource)); + containers.add(getPrometheusContainer()); + podSpec.containers(containers); return podSpec; } @@ -493,7 +509,7 @@ class KubernetesRuntime implements Runtime { return tolerations; } - private V1Container getContainer(List<String> instanceCommand, Function.Resources resource) { + private V1Container getFunctionContainer(List<String> instanceCommand, Function.Resources resource) { final V1Container container = new V1Container().name("pulsarfunction"); // set up the container images @@ -520,12 +536,44 @@ class KubernetesRuntime implements Runtime { container.setResources(resourceRequirements); // set container ports - container.setPorts(getContainerPorts()); + container.setPorts(getFunctionContainerPorts()); + + return container; + } + + private V1Container getPrometheusContainer() { + final V1Container container = new V1Container().name("prometheusmetricsserver"); + + // set up the container images + container.setImage(pulsarDockerImageName); + + // set up the container command + container.setCommand(getPrometheusMetricsServerCommand()); + + // setup the environment variables for the container + final V1EnvVar envVarPodName = new V1EnvVar(); + envVarPodName.name("POD_NAME") + .valueFrom(new V1EnvVarSource() + .fieldRef(new V1ObjectFieldSelector() + .fieldPath("metadata.name"))); + container.setEnv(Arrays.asList(envVarPodName)); + + + // set container resources + final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements(); + final Map<String, Quantity> requests = new HashMap<>(); + requests.put("memory", Quantity.fromString(Long.toString(prometheusMetricsServerRam))); + requests.put("cpu", Quantity.fromString(Double.toString(prometheusMetricsServerCpu))); + resourceRequirements.setRequests(requests); + container.setResources(resourceRequirements); + + // set container ports + container.setPorts(getPrometheusContainerPorts()); return container; } - private List<V1ContainerPort> getContainerPorts() { + private List<V1ContainerPort> getFunctionContainerPorts() { List<V1ContainerPort> ports = new ArrayList<>(); final V1ContainerPort port = new V1ContainerPort(); port.setName("grpc"); @@ -534,6 +582,15 @@ class KubernetesRuntime implements Runtime { return ports; } + private List<V1ContainerPort> getPrometheusContainerPorts() { + List<V1ContainerPort> ports = new ArrayList<>(); + final V1ContainerPort port = new V1ContainerPort(); + port.setName("prometheus"); + port.setContainerPort(PROMETHEUS_PORT); + ports.add(port); + return ports; + } + private static String createJobName(Function.FunctionDetails functionDetails) { return createJobName(functionDetails.getTenant(), functionDetails.getNamespace(), @@ -557,4 +614,23 @@ class KubernetesRuntime implements Runtime { throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize); } } + + private List<String> composePrometheusMetricsServerArgs(String prometheusMetricsServerFile, + Integer expectedMetricsInterval) throws Exception { + List<String> args = new LinkedList<>(); + args.add("java"); + args.add("-cp"); + args.add(prometheusMetricsServerFile); + args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam)); + args.add(PrometheusMetricsServer.class.getName()); + args.add("--function_details"); + args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails()) + "'"); + args.add("--prometheus_port"); + args.add(String.valueOf(PROMETHEUS_PORT)); + args.add("--grpc_port"); + args.add(String.valueOf(GRPC_PORT)); + args.add("--collection_interval"); + args.add(String.valueOf(expectedMetricsInterval)); + return args; + } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index dee265f..b257cbf 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -53,7 +53,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private final AuthenticationConfig authConfig; private final String javaInstanceJarFile; private final String pythonInstanceFile; + private final String prometheusMetricsServerJarFile; private final String logDirectory = "logs/functions"; + private final Integer expectedMetricsInterval; private AppsV1Api appsClient; private CoreV1Api coreClient; @@ -68,7 +70,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { String pulsarServiceUri, String pulsarAdminUri, String stateStorageServiceUri, - AuthenticationConfig authConfig) { + AuthenticationConfig authConfig, + Integer expectedMetricsInterval) { this.k8Uri = k8Uri; if (!isEmpty(jobNamespace)) { this.jobNamespace = jobNamespace; @@ -94,6 +97,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { this.authConfig = authConfig; this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar"; this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py"; + this.prometheusMetricsServerJarFile = this.pulsarRootDir + "/instances/PrometheusMetricsServer.jar"; + this.expectedMetricsInterval = expectedMetricsInterval == null ? -1 : expectedMetricsInterval; } @Override @@ -127,13 +132,15 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { pulsarRootDir, instanceConfig, instanceFile, + prometheusMetricsServerJarFile, logDirectory, codePkgUrl, originalCodeFileName, pulsarServiceUri, pulsarAdminUri, stateStorageServiceUri, - authConfig); + authConfig, + expectedMetricsInterval); } @Override diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index 11e240a..895a1e8 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -72,7 +72,7 @@ public class KubernetesRuntimeTest { this.stateStorageServiceUrl = "bk://localhost:4181"; this.logDirectory = "logs/functions"; this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir, - false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null)); + false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null)); doNothing().when(this.factory).setupClient(); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 47d317f..2514ff6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -130,7 +130,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(), StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(), workerConfig.getStateStorageServiceUrl(), - authConfig); + authConfig, + workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval()); } else { throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index c18f824..bc97969 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -141,6 +141,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private String pulsarAdminUrl; private Boolean installUserCodeDependencies; private Map<String, String> customLabels; + private Integer expectedMetricsCollectionInterval; } private KubernetesContainerFactory kubernetesContainerFactory;