This is an automated email from the ASF dual-hosted git repository. himanshug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 6964ac2 Adding influxdb emitter as a contrib extension (#7717) 6964ac2 is described below commit 6964ac23a2a3b95f29a2d4aa443ef9fcbc84dfdb Author: awelsh93 <32643586+awels...@users.noreply.github.com> AuthorDate: Thu May 23 19:11:48 2019 +0100 Adding influxdb emitter as a contrib extension (#7717) * Adding influxdb emitter as a contrib extension * addressing code review comments --- distribution/pom.xml | 2 + .../extensions-contrib/influxdb-emitter.md | 75 ++++++++ docs/content/development/extensions.md | 1 + extensions-contrib/influxdb-emitter/pom.xml | 74 +++++++ .../druid/emitter/influxdb/InfluxdbEmitter.java | 214 +++++++++++++++++++++ .../emitter/influxdb/InfluxdbEmitterConfig.java | 196 +++++++++++++++++++ .../emitter/influxdb/InfluxdbEmitterModule.java | 61 ++++++ .../org.apache.druid.initialization.DruidModule | 16 ++ .../influxdb/InfluxdbEmitterConfigTest.java | 212 ++++++++++++++++++++ .../emitter/influxdb/InfluxdbEmitterTest.java | 208 ++++++++++++++++++++ pom.xml | 1 + 11 files changed, 1060 insertions(+) diff --git a/distribution/pom.xml b/distribution/pom.xml index e25b3c5..4ec9b26 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -324,6 +324,8 @@ <argument>-c</argument> <argument>org.apache.druid.extensions.contrib:druid-influx-extensions</argument> <argument>-c</argument> + <argument>org.apache.druid.extensions.contrib:druid-influxdb-emitter</argument> + <argument>-c</argument> <argument>org.apache.druid.extensions.contrib:druid-kafka-eight-simple-consumer</argument> <argument>-c</argument> <argument>org.apache.druid.extensions.contrib:kafka-emitter</argument> diff --git a/docs/content/development/extensions-contrib/influxdb-emitter.md b/docs/content/development/extensions-contrib/influxdb-emitter.md new file mode 100644 index 0000000..138a0bb --- /dev/null +++ b/docs/content/development/extensions-contrib/influxdb-emitter.md @@ -0,0 +1,75 @@ +--- +layout: doc_page +title: "InfluxDB Emitter" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +# InfluxDB Emitter + +To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-influxdb-emitter` extension. + +## Introduction + +This extension emits druid metrics to [InfluxDB](https://www.influxdata.com/time-series-platform/influxdb/) over HTTP. Currently this emitter only emits service metric events to InfluxDB (See [Druid metrics](../../operations/metrics.html) for a list of metrics). +When a metric event is fired it is added to a queue of events. After a configurable amount of time, the events on the queue are transformed to InfluxDB's line protocol +and POSTed to the InfluxDB HTTP API. The entire queue is flushed at this point. The queue is also flushed as the emitter is shutdown. + +Note that authentication and authorization must be [enabled](https://docs.influxdata.com/influxdb/v1.7/administration/authentication_and_authorization/) on the InfluxDB server. + +## Configuration + +All the configuration parameters for the influxdb emitter are under `druid.emitter.influxdb`. + +|Property|Description|Required?|Default| +|--------|-----------|---------|-------| +|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A| +|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086| +|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A| +|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.Max_Value(=2^31-1)| +|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000| +|`druid.emitter.influxdb.flushDelay`|How long (in milliseconds) the scheduled method will wait until it first runs.|No|60000| +|`druid.emitter.influxdb.influxdbUserName`|The username for authenticating with the InfluxDB database.|Yes|N/A| +|`druid.emitter.influxdb.influxdbPassword`|The password of the database authorized user|Yes|N/A| +|`druid.emitter.influxdb.dimensionWhitelist`|A whitelist of metric dimensions to include as tags|No|`["dataSource","type","numMetrics","numDimensions","threshold","dimension","taskType","taskStatus","tier"]`| + +## InfluxDB Line Protocol + +An example of how this emitter parses a Druid metric event into InfluxDB's [line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_reference/) is given here: + +The syntax of the line protocol is : + +`<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]` + +where timestamp is in nano-seconds since epoch. + +A typical service metric event as recorded by Druid's logging emitter is: `Event [{"feed":"metrics","timestamp":"2017-10-31T09:09:06.857Z","service":"druid/historical","host":"historical001:8083","version":"0.11.0-SNAPSHOT","metric":"query/cache/total/hits","value":34787256}]`. + +This event is parsed into line protocol according to these rules: + +* The measurement becomes druid_query since query is the first part of the metric. +* The tags are service=druid/historical, hostname=historical001, metric=druid_cache_total. (The metric tag is the middle part of the druid metric separated with _ and preceded by druid_. Another example would be if an event has metric=query/time then there is no middle part and hence no metric tag) +* The field is druid_hits since this is the last part of the metric. + +This gives the following String which can be POSTed to InfluxDB: `"druid_query,service=druid/historical,hostname=historical001,metric=druid_cache_total druid_hits=34787256 1509440946857000000"` + +The InfluxDB emitter has a white list of dimensions +which will be added as a tag to the line protocol string if the metric has a dimension from the white list. +The value of the dimension is sanitized such that every occurence of a dot or whitespace is replaced with a `_` . diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 5112ee9..c58044c 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -96,6 +96,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| |druid-opentsdb-emitter|OpenTSDB metrics emitter |[link](../development/extensions-contrib/opentsdb-emitter.html)| |druid-moving-average-query|Support for [Moving Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate [Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions) in Druid queries.|[link](../development/extensions-contrib/moving-average-query.html)| +|druid-influxdb-emitter|InfluxDB metrics emitter|[link](../development/extensions-contrib/influxdb-emitter.html)| ## Promoting Community Extension to Core Extension diff --git a/extensions-contrib/influxdb-emitter/pom.xml b/extensions-contrib/influxdb-emitter/pom.xml new file mode 100644 index 0000000..3605fd0 --- /dev/null +++ b/extensions-contrib/influxdb-emitter/pom.xml @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <groupId>org.apache.druid.extensions.contrib</groupId> + <artifactId>druid-influxdb-emitter</artifactId> + <name>druid-influxdb-emitter</name> + <description>influxdb-emitter</description> + + <parent> + <groupId>org.apache.druid</groupId> + <artifactId>druid</artifactId> + <version>0.15.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <dependencies> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-core</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>pl.pragmatists</groupId> + <artifactId>JUnitParams</artifactId> + <version>1.0.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.3.6</version> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> + +</project> diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java new file mode 100644 index 0000000..93f7bc2 --- /dev/null +++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.influxdb; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + + +public class InfluxdbEmitter implements Emitter +{ + + private static final Logger log = new Logger(InfluxdbEmitter.class); + private final HttpClient influxdbClient; + private final InfluxdbEmitterConfig influxdbEmitterConfig; + private final AtomicBoolean started = new AtomicBoolean(false); + private final ScheduledExecutorService exec = ScheduledExecutors.fixed(1, "InfluxdbEmitter-%s"); + private final ImmutableSet dimensionWhiteList; + private final LinkedBlockingQueue<ServiceMetricEvent> eventsQueue; + private static final Pattern DOT_OR_WHITESPACE = Pattern.compile("[\\s]+|[.]+"); + + public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig) + { + this.influxdbEmitterConfig = influxdbEmitterConfig; + this.influxdbClient = HttpClientBuilder.create().build(); + this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize()); + this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist(); + log.info("constructed influxdb emitter"); + } + + @Override + public void start() + { + synchronized (started) { + if (!started.get()) { + exec.scheduleAtFixedRate( + () -> transformAndSendToInfluxdb(eventsQueue), + influxdbEmitterConfig.getFlushDelay(), + influxdbEmitterConfig.getFlushPeriod(), + TimeUnit.MILLISECONDS + ); + started.set(true); + } + } + } + + @Override + public void emit(Event event) + { + if (event instanceof ServiceMetricEvent) { + ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; + try { + eventsQueue.put(metricEvent); + } + catch (InterruptedException exception) { + log.error(exception, "Failed to add metricEvent to events queue."); + Thread.currentThread().interrupt(); + } + } + } + + public void postToInflux(String payload) + { + HttpPost post = new HttpPost( + "http://" + influxdbEmitterConfig.getHostname() + + ":" + influxdbEmitterConfig.getPort() + + "/write?db=" + influxdbEmitterConfig.getDatabaseName() + + "&u=" + influxdbEmitterConfig.getInfluxdbUserName() + + "&p=" + influxdbEmitterConfig.getInfluxdbPassword() + ); + + post.setEntity(new StringEntity(payload, ContentType.DEFAULT_TEXT)); + post.setHeader("Content-Type", "application/x-www-form-urlencoded"); + + try { + influxdbClient.execute(post); + } + catch (IOException ex) { + log.info(ex, "Failed to post events to InfluxDB."); + } + finally { + post.releaseConnection(); + } + } + + public String transformForInfluxSystems(ServiceMetricEvent event) + { + // split Druid metric on slashes and join middle parts (if any) with "_" + String[] parts = getValue("metric", event).split("/"); + String metric = String.join( + "_", + Arrays.asList( + Arrays.copyOfRange( + parts, + 1, + parts.length - 1 + ) + ) + ); + + // measurement + StringBuilder payload = new StringBuilder("druid_"); + payload.append(parts[0]); + + // tags + StringBuilder tag = new StringBuilder(",service="); + tag.append(getValue("service", event)); + String metricTag = parts.length == 2 ? "" : ",metric=druid_" + metric; + tag.append(metricTag); + tag.append(StringUtils.format(",hostname=%s", getValue("host", event).split(":")[0])); + ImmutableSet<String> dimNames = ImmutableSet.copyOf(event.getUserDims().keySet()); + for (String dimName : dimNames) { + if (this.dimensionWhiteList.contains(dimName)) { + tag.append(StringUtils.format(",%1$s=%2$s", dimName, sanitize(String.valueOf(event.getUserDims().get(dimName))))); + } + } + payload.append(tag); + + // fields + payload.append(StringUtils.format(" druid_%1$s=%2$s", parts[parts.length - 1], getValue("value", event))); + + // timestamp + payload.append(StringUtils.format(" %d\n", event.getCreatedTime().getMillis() * 1000000)); + + return payload.toString(); + } + + private static String sanitize(String namespace) + { + return DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_"); + } + + public String getValue(String key, ServiceMetricEvent event) + { + switch (key) { + case "service": + return event.getService(); + case "eventType": + return event.getClass().getSimpleName(); + case "metric": + return event.getMetric(); + case "feed": + return event.getFeed(); + case "host": + return event.getHost(); + case "value": + return event.getValue().toString(); + default: + return key; + } + } + + @Override + public void flush() throws IOException + { + if (started.get()) { + transformAndSendToInfluxdb(eventsQueue); + } + } + + @Override + public void close() throws IOException + { + flush(); + log.info("Closing [%s]", this.getClass().getName()); + started.set(false); + exec.shutdownNow(); + } + + public void transformAndSendToInfluxdb(LinkedBlockingQueue<ServiceMetricEvent> eventsQueue) + { + StringBuilder payload = new StringBuilder(); + int initialQueueSize = eventsQueue.size(); + for (int i = 0; i < initialQueueSize; i++) { + payload.append(transformForInfluxSystems(eventsQueue.poll())); + } + postToInflux(payload.toString()); + } + +} diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java new file mode 100644 index 0000000..d96b070 --- /dev/null +++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.influxdb; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.logger.Logger; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +public class InfluxdbEmitterConfig +{ + + private static final int DEFAULT_PORT = 8086; + private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE; + private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds + private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier"); + + @JsonProperty + private final String hostname; + @JsonProperty + private final Integer port; + @JsonProperty + private final String databaseName; + @JsonProperty + private final Integer maxQueueSize; + @JsonProperty + private final Integer flushPeriod; + @JsonProperty + private final Integer flushDelay; + @JsonProperty + private final String influxdbUserName; + @JsonProperty + private final String influxdbPassword; + @JsonProperty + private final ImmutableSet<String> dimensionWhitelist; + + private static Logger log = new Logger(InfluxdbEmitterConfig.class); + + @JsonCreator + public InfluxdbEmitterConfig( + @JsonProperty("hostname") String hostname, + @JsonProperty("port") Integer port, + @JsonProperty("databaseName") String databaseName, + @JsonProperty("maxQueueSize") Integer maxQueueSize, + @JsonProperty("flushPeriod") Integer flushPeriod, + @JsonProperty("flushDelay") Integer flushDelay, + @JsonProperty("influxdbUserName") String influxdbUserName, + @JsonProperty("influxdbPassword") String influxdbPassword, + @JsonProperty("dimensionWhitelist") Set<String> dimensionWhitelist + ) + { + this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null"); + this.port = port == null ? DEFAULT_PORT : port; + this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null"); + this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize; + this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod; + this.flushDelay = flushDelay == null ? DEFAULT_FLUSH_PERIOD : flushDelay; + this.influxdbUserName = Preconditions.checkNotNull(influxdbUserName, "influxdbUserName can not be null"); + this.influxdbPassword = Preconditions.checkNotNull(influxdbPassword, "influxdbPassword can not be null"); + this.dimensionWhitelist = dimensionWhitelist == null ? ImmutableSet.copyOf(DEFAULT_DIMENSION_WHITELIST) : ImmutableSet.copyOf(dimensionWhitelist); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof InfluxdbEmitterConfig)) { + return false; + } + + InfluxdbEmitterConfig that = (InfluxdbEmitterConfig) o; + + if (getPort() != that.getPort()) { + return false; + } + if (!getHostname().equals(that.getHostname())) { + return false; + } + if (!getDatabaseName().equals(that.getDatabaseName())) { + return false; + } + if (getFlushPeriod() != that.getFlushPeriod()) { + return false; + } + if (getMaxQueueSize() != that.getMaxQueueSize()) { + return false; + } + if (getFlushDelay() != that.getFlushDelay()) { + return false; + } + if (!getInfluxdbUserName().equals(that.getInfluxdbUserName())) { + return false; + } + if (!getInfluxdbPassword().equals(that.getInfluxdbPassword())) { + return false; + } + if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) { + return false; + } + return true; + + } + + @Override + public int hashCode() + { + int result = getHostname().hashCode(); + result = 31 * result + getPort(); + result = 31 * result + getDatabaseName().hashCode(); + result = 31 * result + getFlushPeriod(); + result = 31 * result + getMaxQueueSize(); + result = 31 * result + getFlushDelay(); + result = 31 * result + getInfluxdbUserName().hashCode(); + result = 31 * result + getInfluxdbPassword().hashCode(); + result = 31 * result + getDimensionWhitelist().hashCode(); + return result; + } + + @JsonProperty + public String getHostname() + { + return hostname; + } + + @JsonProperty + public int getPort() + { + return port; + } + + @JsonProperty + public String getDatabaseName() + { + return databaseName; + } + + @JsonProperty + public int getFlushPeriod() + { + return flushPeriod; + } + + @JsonProperty + public int getMaxQueueSize() + { + return maxQueueSize; + } + + @JsonProperty + public int getFlushDelay() + { + return flushDelay; + } + + @JsonProperty + public String getInfluxdbUserName() + { + return influxdbUserName; + } + + @JsonProperty + public String getInfluxdbPassword() + { + return influxdbPassword; + } + + @JsonProperty + public ImmutableSet<String> getDimensionWhitelist() + { + return dimensionWhitelist; + } +} diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java new file mode 100644 index 0000000..f6e6fa4 --- /dev/null +++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.influxdb; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.Emitter; + +import java.util.Collections; +import java.util.List; + +public class InfluxdbEmitterModule implements DruidModule +{ + + private static final String EMITTER_TYPE = "influxdb"; + private static final Logger log = new Logger(InfluxdbEmitterModule.class); + + @Override + public List<? extends Module> getJacksonModules() + { + return Collections.EMPTY_LIST; + } + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, InfluxdbEmitterConfig.class); + } + + @Provides + @ManageLifecycle + @Named(EMITTER_TYPE) + public Emitter getEmitter(InfluxdbEmitterConfig influxdbEmitterConfig, ObjectMapper mapper) + { + return new InfluxdbEmitter(influxdbEmitterConfig); + } +} diff --git a/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 0000000..fafe8ee --- /dev/null +++ b/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.druid.emitter.influxdb.InfluxdbEmitterModule diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java new file mode 100644 index 0000000..aa0d061 --- /dev/null +++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.influxdb; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class InfluxdbEmitterConfigTest +{ + private ObjectMapper mapper = new DefaultObjectMapper(); + private InfluxdbEmitterConfig influxdbEmitterConfig; + + @Before + public void setUp() + { + mapper.setInjectableValues(new InjectableValues.Std().addValue( + ObjectMapper.class, + new DefaultObjectMapper() + )); + + influxdbEmitterConfig = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + } + + @Test + public void testInfluxdbEmitterConfigObjectsAreDifferent() throws IOException + { + InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig( + "localhost", + 8080, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + Assert.assertNotEquals(influxdbEmitterConfig, influxdbEmitterConfigComparison); + } + + @Test(expected = NullPointerException.class) + public void testConfigWithNullHostname() throws IOException + { + InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig( + null, + 8080, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + } + + @Test + public void testConfigWithNullPort() throws IOException + { + InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig( + "localhost", + null, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + int expectedPort = 8086; + Assert.assertEquals(expectedPort, influxdbEmitterConfig.getPort()); + } + + @Test + public void testEqualsMethod() + { + InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + Assert.assertTrue(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison)); + } + + @Test + public void testEqualsMethodWithNotEqualConfigs() + { + InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 10000, + "adam", + "password", + null + ); + Assert.assertFalse(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison)); + } + + @Test(expected = NullPointerException.class) + public void testConfigWithNullInfluxdbUserName() throws IOException + { + InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + null, + "password", + null + ); + } + + @Test(expected = NullPointerException.class) + public void testConfigWithNullInfluxdbPassword() throws IOException + { + InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + null, + null + ); + } + + @Test + public void testConfigWithNullDimensionWhitelist() + { + InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier")); + Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist()); + } + + @Test + public void testConfigWithDimensionWhitelist() + { + InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + ImmutableSet.of("dataSource", "taskType") + ); + ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "taskType")); + Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist()); + } + +} diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java new file mode 100644 index 0000000..2095a2f --- /dev/null +++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.influxdb; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class InfluxdbEmitterTest +{ + + private ServiceMetricEvent event; + + @Before + public void setUp() + { + DateTime date = new DateTime(2017, + 10, + 30, + 10, + 00, + DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds + String metric = "metric/te/st/value"; + Number value = 1234; + ImmutableMap<String, String> serviceDims = ImmutableMap.of( + "service", + "druid/historical", + "host", + "localhost", + "version", + "0.10.0" + ); + ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + builder.setDimension("nonWhiteListedDim", "test"); + builder.setDimension("dataSource", "test_datasource"); + ServiceEventBuilder eventBuilder = builder.build(date, metric, value); + event = (ServiceMetricEvent) eventBuilder.build(serviceDims); + } + + @Test + public void testTransformForInfluxWithLongMetric() + { + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + String expected = + "druid_metric,service=druid/historical,metric=druid_te_st,hostname=localhost,dataSource=test_datasource druid_value=1234 1509357600000000000" + + "\n"; + String actual = influxdbEmitter.transformForInfluxSystems(event); + Assert.assertEquals(expected, actual); + } + + @Test + public void testTransformForInfluxWithShortMetric() + { + DateTime date = new DateTime(2017, + 10, + 30, + 10, + 00, + DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds + String metric = "metric/time"; + Number value = 1234; + ImmutableMap<String, String> serviceDims = ImmutableMap.of( + "service", + "druid/historical", + "host", + "localhost", + "version", + "0.10.0" + ); + ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + ServiceEventBuilder eventBuilder = builder.build(date, metric, value); + ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims); + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + String expected = "druid_metric,service=druid/historical,hostname=localhost druid_time=1234 1509357600000000000" + + "\n"; + String actual = influxdbEmitter.transformForInfluxSystems(event); + Assert.assertEquals(expected, actual); + } + + @Test + public void testMetricIsInDimensionWhitelist() + { + DateTime date = new DateTime(2017, + 10, + 30, + 10, + 00, + DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds + String metric = "metric/time"; + Number value = 1234; + ImmutableMap<String, String> serviceDims = ImmutableMap.of( + "service", + "druid/historical", + "host", + "localhost", + "version", + "0.10.0" + ); + ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + ServiceEventBuilder eventBuilder = builder.build(date, metric, value); + builder.setDimension("dataSource", "wikipedia"); + builder.setDimension("taskType", "index"); + ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims); + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + ImmutableSet.of("dataSource") + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia druid_time=1234 1509357600000000000" + + "\n"; + String actual = influxdbEmitter.transformForInfluxSystems(event); + Assert.assertEquals(expected, actual); + } + + @Test + public void testMetricIsInDefaultDimensionWhitelist() + { + DateTime date = new DateTime(2017, + 10, + 30, + 10, + 00, + DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds + String metric = "metric/time"; + Number value = 1234; + ImmutableMap<String, String> serviceDims = ImmutableMap.of( + "service", + "druid/historical", + "host", + "localhost", + "version", + "0.10.0" + ); + ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + ServiceEventBuilder eventBuilder = builder.build(date, metric, value); + builder.setDimension("dataSource", "wikipedia"); + builder.setDimension("taskType", "index"); + ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims); + InfluxdbEmitterConfig config = new InfluxdbEmitterConfig( + "localhost", + 8086, + "dbname", + 10000, + 15000, + 30000, + "adam", + "password", + null + ); + InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config); + String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia,taskType=index druid_time=1234 1509357600000000000" + + "\n"; + String actual = influxdbEmitter.transformForInfluxSystems(event); + Assert.assertEquals(expected, actual); + } +} diff --git a/pom.xml b/pom.xml index df24ca8..0c5c5ca 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,7 @@ <module>extensions-contrib/momentsketch</module> <module>extensions-contrib/moving-average-query</module> <module>extensions-contrib/tdigestsketch</module> + <module>extensions-contrib/influxdb-emitter</module> <!-- distribution packaging --> <module>distribution</module> </modules> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org