This is an automated email from the ASF dual-hosted git repository. nwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new b069e45 Support custom metrics rules for PrometheusSink (#3493) b069e45 is described below commit b069e450a78c2f77ba5427c3f1aceda1e881e9f2 Author: choi se <think...@gmail.com> AuthorDate: Thu Apr 2 14:14:28 2020 +0900 Support custom metrics rules for PrometheusSink (#3493) * Support Java 11 * config travis to use oracle jdk 11 * Java 11 support (#3399) * Support Java 11 * config travis to use oracle jdk 11 * Add check jdk version * Fix command arguments. Change insert gc_options Update list Fix gc-logging * Add missing parameter * typo * Add pause time * wip * Support jmx_exporter format configuration. * Fix checkstyle * Remove unused * Java 11 support (#3399) * Support Java 11 * config travis to use oracle jdk 11 * Add check jdk version * Fix command arguments. Change insert gc_options Update list Fix gc-logging * wip * Support jmx_exporter format configuration. * Fix checkstyle * Remove unused * Update kafkaOffset metrics * Add Rules * Make log/sink/consume Streamlet component support setName and setNumPartitions (#3459) * Patch to fix cppcheck with newer glibc (#3471) * Add documents for setting up a docker based development environment (#3475) * Improve concurrency for needed parts. (#3107) * Change concurrent Map * Change concurrent Map * HashMap changes for unneeded parts. * HashMap changes for unneeded parts. * Review changes * Changes HashMap for unneeded parts. * Improve concurrency for needed parts. * Remove unused imports. * Remove unused imports. * Remove unused imports. * Fix NPE (cherry picked from commit 545d3814b315c29d3e396309a2ededaad193ec32) * Fix WhitespaceAround * Add dummy Object * Fix ConstantName (cherry picked from commit 8d6d5067072e92d6e276f93e18297ddedc647c6c) * Update kafkaOffset metrics * Add Rules * Update line is longer than 100 characters * Update line is longer than 100 characters * Add attrNameSnakeCase or other metrics fix * fix checkstyle Co-authored-by: Ning Wang <wangnin...@gmail.com> Co-authored-by: Ning Wang <nw...@twitter.com> Co-authored-by: Nicholas Nezis <nicholas.ne...@gmail.com> --- .../heron/metricsmgr/sink/PrometheusSink.java | 236 ++++++++++++++++++--- .../heron/metricsmgr/sink/PrometheusSinkTests.java | 124 ++++++++++- 2 files changed, 324 insertions(+), 36 deletions(-) diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java index 0db442a..56a244b 100644 --- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java +++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java @@ -20,10 +20,17 @@ package org.apache.heron.metricsmgr.sink; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.cache.Cache; @@ -33,6 +40,9 @@ import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo; import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord; import org.apache.heron.spi.metricsmgr.sink.SinkContext; +import static java.lang.String.format; +import static org.apache.heron.metricsmgr.sink.PrometheusSink.Prometheus.sanitizeMetricName; + /** * A web sink that exposes and endpoint that Prometheus can scrape * @@ -57,6 +67,7 @@ public class PrometheusSink extends AbstractWebSink { // This is the cache that is used to serve the metrics private Cache<String, Map<String, Double>> metricsCache; + private List<Rule> rules = new ArrayList<Rule>(); private String cluster; private String role; @@ -66,6 +77,26 @@ public class PrometheusSink extends AbstractWebSink { super(); } + private enum Type { + COUNTER, + GAUGE, + SUMMARY, + HISTOGRAM, + UNTYPED, + } + + private static class Rule { + public Pattern pattern; + public String name; + public String value; + public Double valueFactor = 1.0; + public String help; + public boolean attrNameSnakeCase; + public Type type = Type.UNTYPED; + public ArrayList<String> labelNames; + public ArrayList<String> labelValues; + } + @Override void initialize(Map<String, Object> configuration, SinkContext context) { metricsCache = createCache(); @@ -73,6 +104,64 @@ public class PrometheusSink extends AbstractWebSink { cluster = context.getCluster(); role = context.getRole(); environment = context.getEnvironment(); + + if (configuration.containsKey("rules")) { + List<Map<String, Object>> configRules = (List<Map<String, Object>>) + configuration.get("rules"); + for (Map<String, Object> ruleObject : configRules) { + Rule rule = new Rule(); + rules.add(rule); + if (ruleObject.containsKey("pattern")) { + rule.pattern = Pattern.compile("^.*(?:" + (String) ruleObject.get("pattern") + ").*$"); + } + if (ruleObject.containsKey("name")) { + rule.name = (String) ruleObject.get("name"); + } + if (ruleObject.containsKey("value")) { + rule.value = String.valueOf(ruleObject.get("value")); + } + if (ruleObject.containsKey("valueFactor")) { + String valueFactor = String.valueOf(ruleObject.get("valueFactor")); + try { + rule.valueFactor = Double.valueOf(valueFactor); + } catch (NumberFormatException e) { + // use default value + } + } + if (ruleObject.containsKey("attrNameSnakeCase")) { + rule.attrNameSnakeCase = (Boolean) ruleObject.get("attrNameSnakeCase"); + } + if (ruleObject.containsKey("type")) { + rule.type = Type.valueOf((String) ruleObject.get("type")); + } + if (ruleObject.containsKey("help")) { + rule.help = (String) ruleObject.get("help"); + } + if (ruleObject.containsKey("labels")) { + TreeMap labels = new TreeMap((Map<String, Object>) ruleObject.get("labels")); + rule.labelNames = new ArrayList<String>(); + rule.labelValues = new ArrayList<String>(); + for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) labels + .entrySet()) { + rule.labelNames.add(entry.getKey()); + rule.labelValues.add((String) entry.getValue()); + } + } + + // Validation. + if ((rule.labelNames != null || rule.help != null) && rule.name == null) { + throw new IllegalArgumentException("Must provide name, if help or labels are given: " + + ruleObject); + } + if (rule.name != null && rule.pattern == null) { + throw new IllegalArgumentException("Must provide pattern, if name is given: " + + ruleObject); + } + } + } else { + // Default to a single default rule. + rules.add(new Rule()); + } } @Override @@ -82,6 +171,9 @@ public class PrometheusSink extends AbstractWebSink { final StringBuilder sb = new StringBuilder(); metrics.forEach((String source, Map<String, Double> sourceMetrics) -> { + // Map the labels. + final Map<String, String> labelKV = new TreeMap<String, String>(); + String[] sources = source.split("/"); String topology = sources[0]; String component = sources[1]; @@ -96,6 +188,18 @@ public class PrometheusSink extends AbstractWebSink { final String clusterRoleEnv = hasClusterRoleEnvironment(c, r, e) ? String.format("%s/%s/%s", c, r, e) : null; + labelKV.put("topology", topology); + labelKV.put("component", component); + labelKV.put("instance_id", instance); + + if (clusterRoleEnv != null) { + labelKV.put("cluster_role_env", clusterRoleEnv); + } + + if (componentType != null) { + labelKV.put("component_type", componentType); + } + sourceMetrics.forEach((String metric, Double value) -> { // some stream manager metrics in heron contain a instance id as part of the metric name @@ -104,46 +208,79 @@ public class PrometheusSink extends AbstractWebSink { // __time_spent_back_pressure_by_compid/container_1_exclaim1_1 // TODO convert to small classes for less string manipulation final String metricName; - final String metricInstanceId; if (componentIsStreamManger) { final boolean metricHasInstanceId = metric.contains("_by_"); final String[] metricParts = metric.split("/"); if (metricHasInstanceId && metricParts.length == 3) { - metricName = String.format("%s_%s", metricParts[0], metricParts[2]); - metricInstanceId = metricParts[1]; + metricName = splitTargetInstance(metricParts[0], metricParts[2], labelKV); + labelKV.put("metric_instance_id", metricParts[1]); } else if (metricHasInstanceId && metricParts.length == 2) { - metricName = metricParts[0]; - metricInstanceId = metricParts[1]; + metricName = splitTargetInstance(metricParts[0], null, labelKV); + labelKV.put("metric_instance_id", metricParts[1]); + } else if (metricParts.length == 2) { + metricName = splitTargetInstance(metricParts[0], metricParts[1], labelKV); } else { - metricName = metric; - metricInstanceId = null; + metricName = splitTargetInstance(metric, null, labelKV); } - } else { - metricName = metric; - metricInstanceId = null; - } - - String exportedMetricName = String.format("%s_%s", HERON_PREFIX, - metricName.replace("__", "").toLowerCase()); - sb.append(Prometheus.sanitizeMetricName(exportedMetricName)) - .append("{") - .append("topology=\"").append(topology).append("\",") - .append("component=\"").append(component).append("\",") - .append("instance_id=\"").append(instance).append("\""); - - if (clusterRoleEnv != null) { - sb.append(",cluster_role_env=\"").append(clusterRoleEnv).append("\""); - } - - if (componentType != null) { - sb.append(",component_type=\"").append(componentType).append("\""); - } - - if (metricInstanceId != null) { - sb.append(",metric_instance_id=\"").append(metricInstanceId).append("\""); + final AtomicReference<String> name = new AtomicReference<>(sanitizeMetricName(metric)); + rules.forEach(rule -> { + String ruleName = name.get(); + Matcher matcher = null; + if (rule.pattern != null) { + matcher = rule.pattern.matcher(metric); + if (!matcher.matches()) { + return; + } + } + + // If there's no name provided, use default export format. + if (rule.name == null || rule.name.isEmpty()) { + // nothing + } else { + // Matcher is set below here due to validation in the constructor. + ruleName = sanitizeMetricName(matcher.replaceAll(rule.name)); + if (ruleName.isEmpty()) { + return; + } + } + if (rule.attrNameSnakeCase) { + name.set(toSnakeAndLowerCase(ruleName)); + } else { + name.set(ruleName.toLowerCase()); + } + if (rule.labelNames != null) { + for (int i = 0; i < rule.labelNames.size(); i++) { + final String unsafeLabelName = rule.labelNames.get(i); + final String labelValReplacement = rule.labelValues.get(i); + String labelName = sanitizeMetricName(matcher.replaceAll(unsafeLabelName)); + String labelValue = matcher.replaceAll(labelValReplacement); + labelName = labelName.toLowerCase(); + if (!labelName.isEmpty() && !labelValue.isEmpty()) { + labelKV.put(labelName, labelValue); + } + } + } + }); + metricName = name.get(); } + // TODO Type, Help + String exportedMetricName = format("%s_%s", HERON_PREFIX, + metricName + .replace("__", "") + .toLowerCase()); + sb.append(sanitizeMetricName(exportedMetricName)) + .append("{"); + final AtomicBoolean isFirst = new AtomicBoolean(true); + labelKV.forEach((k, v) -> { + // Add labels + if (!isFirst.get()) { + sb.append(','); + } + sb.append(format("%s=\"%s\"", k, v)); + isFirst.set(false); + }); sb.append("} ") .append(Prometheus.doubleToGoString(value)) .append(" ").append(currentTimeMillis()) @@ -154,6 +291,45 @@ public class PrometheusSink extends AbstractWebSink { return sb.toString().getBytes(); } + private static final Pattern SPLIT_TARGET = Pattern.compile("__(?<name>\\w+)" + + "_(?<target>(?<instance>\\w+)-\\d+)"); + private static final Pattern DIGIT = Pattern.compile("[0-9]+"); + + private String splitTargetInstance(String part1, String part2, Map<String, String> labelKV) { + if (part2 != null) { + if (DIGIT.matcher(part2).matches()) { + labelKV.put("metric_instance_id", part2); + return part1; + } + final Matcher m = SPLIT_TARGET.matcher(part1); + if (m.matches()) { + labelKV.put("metric_instance_id", m.group("target")); + return String.format("%s_%s_%s", m.group("name"), m.group("instance"), part2); + } + return String.format("%s_%s", part1, part2); + } + return part1; + } + + static String toSnakeAndLowerCase(String attrName) { + if (attrName == null || attrName.isEmpty()) { + return attrName; + } + char firstChar = attrName.subSequence(0, 1).charAt(0); + boolean prevCharIsUpperCaseOrUnderscore = Character.isUpperCase(firstChar) || firstChar == '_'; + StringBuilder resultBuilder = new StringBuilder(attrName.length()) + .append(Character.toLowerCase(firstChar)); + for (char attrChar : attrName.substring(1).toCharArray()) { + boolean charIsUpperCase = Character.isUpperCase(attrChar); + if (!prevCharIsUpperCaseOrUnderscore && charIsUpperCase) { + resultBuilder.append("_"); + } + resultBuilder.append(Character.toLowerCase(attrChar)); + prevCharIsUpperCaseOrUnderscore = charIsUpperCase || attrChar == '_'; + } + return resultBuilder.toString(); + } + @Override public void processRecord(MetricsRecord record) { final String[] sources = MetricsUtil.splitRecordSource(record); diff --git a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java index 318d8ec..b73b1dd 100644 --- a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java +++ b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java @@ -28,6 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -81,6 +84,45 @@ public class PrometheusSinkTests { Mockito.when(context.getTopologyName()).thenReturn("testTopology"); Mockito.when(context.getSinkId()).thenReturn("testId"); + /* + # example: metrics.yaml + rules: + - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count + name: kafka_$1_$2_$3_total + attrNameSnakeCase: true + type: COUNTER + labels: + "$4": "$5" + type: COUNTER + */ + /* + example: metrics + kafkaOffset/nginx-lfp-beacon/totalSpoutLag + kafkaOffset/lads_event_meta_backfill_data/partition_10/spoutLag + */ + List<Map<String, Object>> rules = Lists.newArrayList(); + defaultConf.put("rules", rules); + Map<String, Object> rule1 = Maps.newHashMap(); + Map<String, Object> labels1 = Maps.newHashMap(); + rules.add(rule1); + rule1.put("pattern", "kafkaOffset/(.+)/(.+)"); + rule1.put("name", "kafka_offset_$2"); + rule1.put("type", "COUNTER"); + rule1.put("attrNameSnakeCase", true); + rule1.put("labels", labels1); + labels1.put("topic", "$1"); + + Map<String, Object> rule2 = Maps.newHashMap(); + Map<String, Object> labels2 = Maps.newHashMap(); + rules.add(rule2); + rule2.put("pattern", "kafkaOffset/(.+)/partition_(\\d+)/(.+)"); + rule2.put("name", "kafka_offset_partition_$3"); + rule2.put("type", "COUNTER"); + rule2.put("labels", labels2); + rule2.put("attrNameSnakeCase", true); + labels2.put("topic", "$1"); + labels2.put("partition", "$2"); + Iterable<MetricsInfo> infos = Arrays.asList(new MetricsInfo("metric_1", "1.0"), new MetricsInfo("metric_2", "2.0")); @@ -133,7 +175,9 @@ public class PrometheusSinkTests { public void testResponseWhenMetricNamesHaveAnInstanceId() throws IOException { Iterable<MetricsInfo> infos = Arrays.asList( new MetricsInfo("__connection_buffer_by_instanceid/container_1_word_5/packets", "1.0"), - new MetricsInfo("__time_spent_back_pressure_by_compid/container_1_exclaim1_1", "1.0") + new MetricsInfo("__time_spent_back_pressure_by_compid/container_1_exclaim1_1", "1.0"), + new MetricsInfo("__client_stmgr-92/__ack_tuples_to_stmgrs", "1.0"), + new MetricsInfo("__instance_bytes_received/1", "1.0") ); records = Arrays.asList( @@ -154,7 +198,59 @@ public class PrometheusSinkTests { "container_1_word_5", "1.0"), createMetric(topology, "__stmgr__", "stmgr-1", "time_spent_back_pressure_by_compid", - "container_1_exclaim1_1", "1.0") + "container_1_exclaim1_1", "1.0"), + createMetric(topology, "__stmgr__", "stmgr-1", + "client_stmgr_ack_tuples_to_stmgrs", "stmgr-92", "1.0"), + createMetric(topology, "__stmgr__", "stmgr-1", + "instance_bytes_received", "1", "1.0") + ); + + final Set<String> generatedLines = + new HashSet<>(Arrays.asList(new String(sink.generateResponse()).split("\n"))); + + assertEquals(expectedLines.size(), generatedLines.size()); + + expectedLines.forEach((String line) -> { + assertTrue(generatedLines.contains(line)); + }); + } + + @Test + public void testApacheStormKafkaMetrics() throws IOException { + Iterable<MetricsInfo> infos = Arrays.asList( + new MetricsInfo("kafkaOffset/event_data/partition_0/spoutLag", "1.0"), + new MetricsInfo("kafkaOffset/event_data/partition_10/spoutLag", "1.0"), + new MetricsInfo("kafkaOffset/event_data/partition_0/earliestTimeOffset", "1.0"), + new MetricsInfo("kafkaOffset/event_data/totalRecordsInPartitions", "1.0"), + new MetricsInfo("kafkaOffset/event_data/totalSpoutLag", "1.0"), + new MetricsInfo("kafkaOffset/event_data/partition_2/spoutLag", "1.0") + ); + + records = Arrays.asList( + newRecord("shared-aurora-036:31/spout-release-1/container_1_spout-release-1_31", + infos, Collections.emptyList()) + ); + PrometheusTestSink sink = new PrometheusTestSink(); + sink.init(defaultConf, context); + for (MetricsRecord r : records) { + sink.processRecord(r); + } + + final String topology = "testTopology"; + + final List<String> expectedLines = Arrays.asList( + createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31", + "kafka_offset_partition_spout_lag", "event_data", "0", "1.0"), + createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31", + "kafka_offset_partition_spout_lag", "event_data", "10", "1.0"), + createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31", + "kafka_offset_partition_earliest_time_offset", "event_data", "0", "1.0"), + createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31", + "kafka_offset_total_records_in_partitions", "event_data", null, "1.0"), + createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31", + "kafka_offset_total_spout_lag", "event_data", null, "1.0"), + createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31", + "kafka_offset_partition_spout_lag", "event_data", "2", "1.0") ); final Set<String> generatedLines = @@ -195,12 +291,28 @@ public class PrometheusSinkTests { if (metricNameInstanceId != null) { return String.format("heron_%s" - + "{topology=\"%s\",component=\"%s\",instance_id=\"%s\",metric_instance_id=\"%s\"}" + + "{component=\"%s\",instance_id=\"%s\",metric_instance_id=\"%s\",topology=\"%s\"}" + " %s %d", - metric, topology, component, instance, metricNameInstanceId, value, NOW); + metric, component, instance, metricNameInstanceId, topology, value, NOW); } else { - return String.format("heron_%s{topology=\"%s\",component=\"%s\",instance_id=\"%s\"} %s %d", - metric, topology, component, instance, value, NOW); + return String.format("heron_%s{component=\"%s\",instance_id=\"%s\",topology=\"%s\"} %s %d", + metric, component, instance, topology, value, NOW); + } + } + + private String createOffsetMetric(String topology, String component, String instance, + String metric, String topic, String partition, String value) { + + if (partition != null) { + return String.format("heron_%s" + + "{component=\"%s\",instance_id=\"%s\",partition=\"%s\"," + + "topic=\"%s\",topology=\"%s\"}" + + " %s %d", + metric, component, instance, partition, topic, topology, value, NOW); + } else { + return String.format("heron_%s" + + "{component=\"%s\",instance_id=\"%s\",topic=\"%s\",topology=\"%s\"} %s %d", + metric, component, instance, topic, topology, value, NOW); } }