This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fac674  Add maxIdleTime option to EventReceiverFirehose (#5997)
2fac674 is described below

commit 2fac6743d49574a6243f823cfddfe8add1124fea
Author: Hongze Zhang <mailto...@126.com>
AuthorDate: Tue Sep 18 04:50:56 2018 +0800

    Add maxIdleTime option to EventReceiverFirehose (#5997)
---
 .../firehose/EventReceiverFirehoseFactory.java     |  35 ++++++
 .../firehose/EventReceiverFirehostIdleTest.java    | 124 +++++++++++++++++++++
 .../firehose/EventReceiverFirehoseTest.java        |   3 +
 3 files changed, 162 insertions(+)

diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index fc58d9d..14b7a19 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -74,6 +75,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -88,9 +90,11 @@ public class EventReceiverFirehoseFactory implements 
FirehoseFactory<InputRowPar
 
   private static final EmittingLogger log = new 
EmittingLogger(EventReceiverFirehoseFactory.class);
   private static final int DEFAULT_BUFFER_SIZE = 100_000;
+  private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE;
 
   private final String serviceName;
   private final int bufferSize;
+  private final long maxIdleTime;
   private final Optional<ChatHandlerProvider> chatHandlerProvider;
   private final ObjectMapper jsonMapper;
   private final ObjectMapper smileMapper;
@@ -101,6 +105,7 @@ public class EventReceiverFirehoseFactory implements 
FirehoseFactory<InputRowPar
   public EventReceiverFirehoseFactory(
       @JsonProperty("serviceName") String serviceName,
       @JsonProperty("bufferSize") Integer bufferSize,
+      @JsonProperty("maxIdleTime") Long maxIdleTime,
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
       @JacksonInject @Json ObjectMapper jsonMapper,
       @JacksonInject @Smile ObjectMapper smileMapper,
@@ -112,6 +117,8 @@ public class EventReceiverFirehoseFactory implements 
FirehoseFactory<InputRowPar
 
     this.serviceName = serviceName;
     this.bufferSize = bufferSize == null || bufferSize <= 0 ? 
DEFAULT_BUFFER_SIZE : bufferSize;
+    this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ?
+                       DEFAULT_MAX_IDLE_TIME : maxIdleTime;
     this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
     this.jsonMapper = jsonMapper;
     this.smileMapper = smileMapper;
@@ -155,9 +162,16 @@ public class EventReceiverFirehoseFactory implements 
FirehoseFactory<InputRowPar
     return bufferSize;
   }
 
+  @JsonProperty
+  public long getMaxIdleTime()
+  {
+    return maxIdleTime;
+  }
+
   public class EventReceiverFirehose implements ChatHandler, Firehose, 
EventReceiverFirehoseMetric
   {
     private final ScheduledExecutorService exec;
+    private final ExecutorService idleDetector;
     private final BlockingQueue<InputRow> buffer;
     private final InputRowParser<Map<String, Object>> parser;
 
@@ -168,12 +182,29 @@ public class EventReceiverFirehoseFactory implements 
FirehoseFactory<InputRowPar
     private final AtomicLong bytesReceived = new AtomicLong(0);
     private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
     private final ConcurrentMap<String, Long> producerSequences = new 
ConcurrentHashMap<>();
+    private final Stopwatch idleWatch = Stopwatch.createUnstarted();
 
     public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
     {
       this.buffer = new ArrayBlockingQueue<>(bufferSize);
       this.parser = parser;
       exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
+      idleDetector = 
Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
+      idleDetector.submit(() -> {
+        long idled;
+        try {
+          while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < 
maxIdleTime) {
+            Thread.sleep(maxIdleTime - idled);
+          }
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+        log.info("Firehose has been idle for %d ms, closing.", idled);
+        close();
+      });
+      idleWatch.start();
     }
 
     @POST
@@ -185,6 +216,8 @@ public class EventReceiverFirehoseFactory implements 
FirehoseFactory<InputRowPar
         @Context final HttpServletRequest req
     )
     {
+      idleWatch.reset();
+      idleWatch.start();
       Access accessResult = AuthorizationUtils.authorizeResourceAction(
           req,
           new ResourceAction(
@@ -328,6 +361,8 @@ public class EventReceiverFirehoseFactory implements 
FirehoseFactory<InputRowPar
           chatHandlerProvider.get().unregister(serviceName);
         }
         exec.shutdown();
+        idleDetector.shutdown();
+        idleWatch.stop();
       }
     }
 
diff --git 
a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
 
b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
new file mode 100644
index 0000000..18c289e
--- /dev/null
+++ 
b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.realtime.firehose;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
+import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
+import org.apache.druid.server.security.AllowAllAuthenticator;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.Locale;
+
+public class EventReceiverFirehostIdleTest
+{
+  private static final int CAPACITY = 300;
+  private static final long MAX_IDLE_TIME = 5_000L;
+  private static final String SERVICE_NAME = "test_firehose";
+
+  private final String inputRow = "[{\n"
+                                  + "  \"timestamp\":123,\n"
+                                  + "  \"d1\":\"v1\"\n"
+                                  + "}]";
+
+  private EventReceiverFirehoseFactory eventReceiverFirehoseFactory;
+  private EventReceiverFirehoseFactory.EventReceiverFirehose firehose;
+  private EventReceiverFirehoseRegister register = new 
EventReceiverFirehoseRegister();
+  private HttpServletRequest req;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    req = EasyMock.createMock(HttpServletRequest.class);
+    eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
+        SERVICE_NAME,
+        CAPACITY,
+        MAX_IDLE_TIME,
+        null,
+        new DefaultObjectMapper(),
+        new DefaultObjectMapper(),
+        register,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER
+    );
+    firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose) 
eventReceiverFirehoseFactory.connect(
+        new MapInputRowParser(
+            new JSONParseSpec(
+                new TimestampSpec(
+                    "timestamp",
+                    "auto",
+                    null
+                ), new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, 
null),
+                null,
+                null
+            )
+        ),
+        null
+    );
+  }
+
+  @Test(timeout = 40_000L)
+  public void testIdle() throws Exception
+  {
+    Thread.sleep(8_000L);
+    Assert.assertTrue(firehose.isClosed());
+  }
+
+  @Test(timeout = 40_000L)
+  public void testNotIdle() throws Exception
+  {
+    EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
+            .andReturn(null)
+            .anyTimes();
+    EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH))
+            .andReturn(null)
+            .anyTimes();
+    EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+            .andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT)
+            .anyTimes();
+    
EasyMock.expect(req.getHeader("X-Firehose-Producer-Id")).andReturn(null).anyTimes();
+    
EasyMock.expect(req.getContentType()).andReturn("application/json").anyTimes();
+    req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(req);
+
+    final int checks = 5;
+    for (int i = 0; i < checks; i++) {
+      Assert.assertFalse(firehose.isClosed());
+      System.out.printf(Locale.ENGLISH, "Check %d/%d passed\n", i + 1, checks);
+      firehose.addAll(IOUtils.toInputStream(inputRow), req);
+      Thread.sleep(3_000L);
+    }
+
+    Thread.sleep(5_000L);
+    Assert.assertTrue(firehose.isClosed());
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
index 62414e1..683c8ce 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
@@ -57,6 +57,7 @@ public class EventReceiverFirehoseTest
 {
   private static final int CAPACITY = 300;
   private static final int NUM_EVENTS = 100;
+  private static final long MAX_IDLE_TIME = Long.MAX_VALUE;
   private static final String SERVICE_NAME = "test_firehose";
 
   private final String inputRow = "[{\n"
@@ -76,6 +77,7 @@ public class EventReceiverFirehoseTest
     eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
         SERVICE_NAME,
         CAPACITY,
+        MAX_IDLE_TIME,
         null,
         new DefaultObjectMapper(),
         new DefaultObjectMapper(),
@@ -217,6 +219,7 @@ public class EventReceiverFirehoseTest
     EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new 
EventReceiverFirehoseFactory(
         SERVICE_NAME,
         CAPACITY,
+        MAX_IDLE_TIME,
         null,
         new DefaultObjectMapper(),
         new DefaultObjectMapper(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to