This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 85391e9  fix opentsdb emitter always be running  and fail sending tags 
whose value contains colon (#6251)
85391e9 is described below

commit 85391e9fb3cef355dacbbf5c443773fb7aab445a
Author: QiuMM <csurj...@gmail.com>
AuthorDate: Sat Sep 15 03:14:15 2018 +0800

    fix opentsdb emitter always be running  and fail sending tags whose value 
contains colon (#6251)
    
    * fix opentsdb emitter always be running
    
    * check if emitter started
    
    * add more details about consumeDelay in doc
    
    * fix possible thread unsafe
    
    * fix fail sending tags whose value contain colon
---
 .../extensions-contrib/opentsdb-emitter.md         |   7 +-
 .../druid/emitter/opentsdb/EventConverter.java     |  12 ++-
 .../druid/emitter/opentsdb/OpentsdbEmitter.java    |  26 +++++-
 .../emitter/opentsdb/OpentsdbEmitterConfig.java    |  15 +++
 .../druid/emitter/opentsdb/OpentsdbSender.java     | 103 ++++++++++++++-------
 .../druid/emitter/opentsdb/EventConverterTest.java |  10 +-
 .../opentsdb/OpentsdbEmitterConfigTest.java        |   2 +-
 .../druid/emitter/opentsdb/OpentsdbSenderTest.java |   2 +-
 8 files changed, 129 insertions(+), 48 deletions(-)

diff --git a/docs/content/development/extensions-contrib/opentsdb-emitter.md 
b/docs/content/development/extensions-contrib/opentsdb-emitter.md
index dafdfd0..f11a121 100644
--- a/docs/content/development/extensions-contrib/opentsdb-emitter.md
+++ b/docs/content/development/extensions-contrib/opentsdb-emitter.md
@@ -8,7 +8,7 @@ To use this extension, make sure to 
[include](../../operations/including-extensi
 
 ## Introduction
 
-This extension emits druid metrics to 
[OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP. And this emitter 
only emits service metric events to OpenTSDB (See 
http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
+This extension emits druid metrics to 
[OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP (Using `Jersey 
client`). And this emitter only emits service metric events to OpenTSDB (See 
http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
 
 ## Configuration
 
@@ -18,10 +18,11 @@ All the configuration parameters for the opentsdb emitter 
are under `druid.emitt
 |--------|-----------|---------|-------|
 |`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none|
 |`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none|
-|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in 
milliseconds).|no|2000|
-|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000|  
+|`druid.emitter.opentsdb.connectionTimeout`|`Jersey client` connection 
timeout(in milliseconds).|no|2000|
+|`druid.emitter.opentsdb.readTimeout`|`Jersey client` read timeout(in 
milliseconds).|no|2000|
 |`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will 
be sent as one batch)|no|100|
 |`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to 
buffer events.|no|1000|
+|`druid.emitter.opentsdb.consumeDelay`|Queue consuming delay(in milliseconds). 
Actually, we use `ScheduledExecutorService` to schedule consuming events, so 
this `consumeDelay` means the delay between the termination of one execution 
and the commencement of the next. If your druid nodes produce metric events 
fast, then you should decrease this `consumeDelay` or increase the 
`maxQueueSize`.|no|10000|
 |`druid.emitter.opentsdb.metricMapPath`|JSON file defining the desired metrics 
and dimensions for every Druid 
metric|no|./src/main/resources/defaultMetrics.json|
 
 ### Druid to OpenTSDB Event Converter
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
index 3745dd0..c290457 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
@@ -39,6 +39,8 @@ public class EventConverter
 {
   private static final Logger log = new Logger(EventConverter.class);
   private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
+  private static final String COLON = ":";
+  private static final String DEFAULT_COLON_REPLACEMENT = "_";
 
   private final Map<String, Set<String>> metricMap;
 
@@ -72,15 +74,19 @@ public class EventConverter
     Number value = serviceMetricEvent.getValue();
 
     Map<String, Object> tags = new HashMap<>();
-    String service = serviceMetricEvent.getService();
-    String host = serviceMetricEvent.getHost();
+    String service = serviceMetricEvent.getService().replaceAll(COLON, 
DEFAULT_COLON_REPLACEMENT);
+    String host = serviceMetricEvent.getHost().replaceAll(COLON, 
DEFAULT_COLON_REPLACEMENT);
     tags.put("service", service);
     tags.put("host", host);
 
     Map<String, Object> userDims = serviceMetricEvent.getUserDims();
     for (String dim : metricMap.get(metric)) {
       if (userDims.containsKey(dim)) {
-        tags.put(dim, userDims.get(dim));
+        Object dimValue = userDims.get(dim);
+        if (dimValue instanceof String) {
+          dimValue = ((String) dimValue).replaceAll(COLON, 
DEFAULT_COLON_REPLACEMENT);
+        }
+        tags.put(dim, dimValue);
       }
     }
 
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
index 0fa5e60..515c297 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
@@ -20,17 +20,21 @@
 package org.apache.druid.emitter.opentsdb;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class OpentsdbEmitter implements Emitter
 {
   private static final Logger log = new Logger(OpentsdbEmitter.class);
 
   private final OpentsdbSender sender;
   private final EventConverter converter;
+  private final AtomicBoolean started = new AtomicBoolean(false);
 
   public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper)
   {
@@ -40,7 +44,8 @@ public class OpentsdbEmitter implements Emitter
         config.getConnectionTimeout(),
         config.getReadTimeout(),
         config.getFlushThreshold(),
-        config.getMaxQueueSize()
+        config.getMaxQueueSize(),
+        config.getConsumeDelay()
     );
     this.converter = new EventConverter(mapper, config.getMetricMapPath());
   }
@@ -48,11 +53,21 @@ public class OpentsdbEmitter implements Emitter
   @Override
   public void start()
   {
+    synchronized (started) {
+      if (!started.get()) {
+        log.info("Starting Opentsdb Emitter.");
+        sender.start();
+        started.set(true);
+      }
+    }
   }
 
   @Override
   public void emit(Event event)
   {
+    if (!started.get()) {
+      throw new ISE("WTF emit was called while service is not started yet");
+    }
     if (event instanceof ServiceMetricEvent) {
       OpentsdbEvent opentsdbEvent = converter.convert((ServiceMetricEvent) 
event);
       if (opentsdbEvent != null) {
@@ -69,12 +84,17 @@ public class OpentsdbEmitter implements Emitter
   @Override
   public void flush()
   {
-    sender.flush();
+    if (started.get()) {
+      sender.flush();
+    }
   }
 
   @Override
   public void close()
   {
-    sender.close();
+    if (started.get()) {
+      sender.close();
+      started.set(false);
+    }
   }
 }
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
index f953c48..fd230b2 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
@@ -27,6 +27,7 @@ public class OpentsdbEmitterConfig
 {
   private static final int DEFAULT_FLUSH_THRESHOLD = 100;
   private static final int DEFAULT_MAX_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_CONSUME_DELAY_MILLIS = 10000;
   private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 2000;
   private static final int DEFAULT_READ_TIMEOUT_MILLIS = 2000;
 
@@ -49,6 +50,9 @@ public class OpentsdbEmitterConfig
   private final int maxQueueSize;
 
   @JsonProperty
+  private final long consumeDelay;
+
+  @JsonProperty
   private final String metricMapPath;
 
   @JsonCreator
@@ -59,6 +63,7 @@ public class OpentsdbEmitterConfig
       @JsonProperty("readTimeout") Integer readTimeout,
       @JsonProperty("flushThreshold") Integer flushThreshold,
       @JsonProperty("maxQueueSize") Integer maxQueueSize,
+      @JsonProperty("consumeDelay") Long consumeDelay,
       @JsonProperty("metricMapPath") String metricMapPath
   )
   {
@@ -71,6 +76,7 @@ public class OpentsdbEmitterConfig
         (readTimeout == null || readTimeout < 0) ? DEFAULT_READ_TIMEOUT_MILLIS 
: readTimeout;
     this.flushThreshold = (flushThreshold == null || flushThreshold < 0) ? 
DEFAULT_FLUSH_THRESHOLD : flushThreshold;
     this.maxQueueSize = (maxQueueSize == null || maxQueueSize < 0) ? 
DEFAULT_MAX_QUEUE_SIZE : maxQueueSize;
+    this.consumeDelay = (consumeDelay == null || consumeDelay < 0) ? 
DEFAULT_CONSUME_DELAY_MILLIS : consumeDelay;
     this.metricMapPath = metricMapPath;
   }
 
@@ -104,6 +110,9 @@ public class OpentsdbEmitterConfig
     if (maxQueueSize != that.maxQueueSize) {
       return false;
     }
+    if (consumeDelay != that.consumeDelay) {
+      return false;
+    }
     return metricMapPath != null ? metricMapPath.equals(that.metricMapPath)
                                  : that.metricMapPath == null;
   }
@@ -117,6 +126,7 @@ public class OpentsdbEmitterConfig
     result = 31 * result + readTimeout;
     result = 31 * result + flushThreshold;
     result = 31 * result + maxQueueSize;
+    result = 31 * result + (int) consumeDelay;
     result = 31 * result + (metricMapPath != null ? metricMapPath.hashCode() : 
0);
     return result;
   }
@@ -151,6 +161,11 @@ public class OpentsdbEmitterConfig
     return maxQueueSize;
   }
 
+  public long getConsumeDelay()
+  {
+    return consumeDelay;
+  }
+
   public String getMetricMapPath()
   {
     return metricMapPath;
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
index 2f93d7c..8f7e3f7 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
@@ -20,6 +20,7 @@
 package org.apache.druid.emitter.opentsdb;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.WebResource;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -29,8 +30,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class OpentsdbSender
@@ -40,28 +43,40 @@ public class OpentsdbSender
    */
   private static final String PATH = "/api/put";
   private static final Logger log = new Logger(OpentsdbSender.class);
+  private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
 
   private final AtomicLong countLostEvents = new AtomicLong(0);
   private final int flushThreshold;
-  private final List<OpentsdbEvent> events;
   private final BlockingQueue<OpentsdbEvent> eventQueue;
+  private final ScheduledExecutorService scheduler;
+  private final EventConsumer eventConsumer;
+  private final long consumeDelay;
   private final Client client;
   private final WebResource webResource;
-  private final ExecutorService executor = Executors.newFixedThreadPool(1);
-  private volatile boolean running = true;
 
-  public OpentsdbSender(String host, int port, int connectionTimeout, int 
readTimeout, int flushThreshold, int maxQueueSize)
+  public OpentsdbSender(
+      String host,
+      int port,
+      int connectionTimeout,
+      int readTimeout,
+      int flushThreshold,
+      int maxQueueSize,
+      long consumeDelay
+  )
   {
     this.flushThreshold = flushThreshold;
-    events = new ArrayList<>(flushThreshold);
+    this.consumeDelay = consumeDelay;
     eventQueue = new ArrayBlockingQueue<>(maxQueueSize);
+    scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("OpentsdbEventSender-%s")
+        .build());
+    eventConsumer = new EventConsumer();
 
     client = Client.create();
     client.setConnectTimeout(connectionTimeout);
     client.setReadTimeout(readTimeout);
     webResource = client.resource("http://"; + host + ":" + port + PATH);
-
-    executor.execute(new EventConsumer());
   }
 
   public void enqueue(OpentsdbEvent event)
@@ -76,46 +91,70 @@ public class OpentsdbSender
     }
   }
 
+  public void start()
+  {
+    scheduler.scheduleWithFixedDelay(
+        eventConsumer,
+        consumeDelay,
+        consumeDelay,
+        TimeUnit.MILLISECONDS
+    );
+  }
+
   public void flush()
   {
-    sendEvents();
+    try {
+      EventConsumer flushConsumer = new EventConsumer();
+      Future future = scheduler.schedule(flushConsumer, 0, 
TimeUnit.MILLISECONDS);
+      future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+      // send remaining events which size may less than flushThreshold
+      eventConsumer.sendEvents();
+      flushConsumer.sendEvents();
+    }
+    catch (Exception e) {
+      log.warn(e, e.getMessage());
+    }
   }
 
   public void close()
   {
     flush();
     client.destroy();
-    running = false;
-    executor.shutdown();
+    scheduler.shutdown();
   }
 
-  private void sendEvents()
+  private class EventConsumer implements Runnable
   {
-    if (!events.isEmpty()) {
-      try {
-        webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post();
-      }
-      catch (Exception e) {
-        log.error(e, "send to opentsdb server failed");
-      }
-      finally {
-        events.clear();
-      }
+    private final List<OpentsdbEvent> events;
+
+    public EventConsumer()
+    {
+      events = new ArrayList<>(flushThreshold);
     }
-  }
 
-  private class EventConsumer implements Runnable
-  {
     @Override
     public void run()
     {
-      while (running) {
-        if (!eventQueue.isEmpty()) {
-          OpentsdbEvent event = eventQueue.poll();
-          events.add(event);
-          if (events.size() >= flushThreshold) {
-            sendEvents();
-          }
+      while (!eventQueue.isEmpty() && !scheduler.isShutdown()) {
+        OpentsdbEvent event = eventQueue.poll();
+        events.add(event);
+        if (events.size() >= flushThreshold) {
+          sendEvents();
+        }
+      }
+    }
+
+    public void sendEvents()
+    {
+      if (!events.isEmpty()) {
+        try {
+          webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post();
+        }
+        catch (Exception e) {
+          log.error(e, "error occurred when sending metrics to opentsdb 
server.");
+        }
+        finally {
+          events.clear();
         }
       }
     }
diff --git 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
index da3b37d..7b3f205 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
@@ -54,15 +54,15 @@ public class EventConverterTest
   {
     DateTime dateTime = DateTimes.nowUtc();
     ServiceMetricEvent configuredEvent = new ServiceMetricEvent.Builder()
-        .setDimension("dataSource", "data-source")
+        .setDimension("dataSource", "foo:bar")
         .setDimension("type", "groupBy")
         .build(dateTime, "query/time", 10)
-        .build("broker", "brokerHost1");
+        .build("druid:broker", "127.0.0.1:8080");
 
     Map<String, Object> expectedTags = new HashMap<>();
-    expectedTags.put("service", "broker");
-    expectedTags.put("host", "brokerHost1");
-    expectedTags.put("dataSource", "data-source");
+    expectedTags.put("service", "druid_broker");
+    expectedTags.put("host", "127.0.0.1_8080");
+    expectedTags.put("dataSource", "foo_bar");
     expectedTags.put("type", "groupBy");
 
     OpentsdbEvent opentsdbEvent = converter.convert(configuredEvent);
diff --git 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
index b263316..11ccb9d 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
@@ -39,7 +39,7 @@ public class OpentsdbEmitterConfigTest
   @Test
   public void testSerDeserOpentsdbEmitterConfig() throws Exception
   {
-    OpentsdbEmitterConfig opentsdbEmitterConfig = new 
OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, null);
+    OpentsdbEmitterConfig opentsdbEmitterConfig = new 
OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, 10000L, null);
     String opentsdbEmitterConfigString = 
mapper.writeValueAsString(opentsdbEmitterConfig);
     OpentsdbEmitterConfig expectedOpentsdbEmitterConfig = 
mapper.reader(OpentsdbEmitterConfig.class)
                                                                 
.readValue(opentsdbEmitterConfigString);
diff --git 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
index d28f04d..79c8ae5 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
@@ -27,7 +27,7 @@ public class OpentsdbSenderTest
   @Test
   public void testUrl()
   {
-    OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 
100, 1000);
+    OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 
100, 1000, 10000L);
     String expectedUrl = "http://localhost:9999/api/put";;
     Assert.assertEquals(expectedUrl, 
sender.getWebResource().getURI().toString());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to