This is an automated email from the ASF dual-hosted git repository. jonwei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 2fac674 Add maxIdleTime option to EventReceiverFirehose (#5997) 2fac674 is described below commit 2fac6743d49574a6243f823cfddfe8add1124fea Author: Hongze Zhang <mailto...@126.com> AuthorDate: Tue Sep 18 04:50:56 2018 +0800 Add maxIdleTime option to EventReceiverFirehose (#5997) --- .../firehose/EventReceiverFirehoseFactory.java | 35 ++++++ .../firehose/EventReceiverFirehostIdleTest.java | 124 +++++++++++++++++++++ .../firehose/EventReceiverFirehoseTest.java | 3 + 3 files changed, 162 insertions(+) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index fc58d9d..14b7a19 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -74,6 +75,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -88,9 +90,11 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class); private static final int DEFAULT_BUFFER_SIZE = 100_000; + private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE; private final String serviceName; private final int bufferSize; + private final long maxIdleTime; private final Optional<ChatHandlerProvider> chatHandlerProvider; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; @@ -101,6 +105,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar public EventReceiverFirehoseFactory( @JsonProperty("serviceName") String serviceName, @JsonProperty("bufferSize") Integer bufferSize, + @JsonProperty("maxIdleTime") Long maxIdleTime, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject @Json ObjectMapper jsonMapper, @JacksonInject @Smile ObjectMapper smileMapper, @@ -112,6 +117,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar this.serviceName = serviceName; this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; + this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ? + DEFAULT_MAX_IDLE_TIME : maxIdleTime; this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider); this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; @@ -155,9 +162,16 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar return bufferSize; } + @JsonProperty + public long getMaxIdleTime() + { + return maxIdleTime; + } + public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric { private final ScheduledExecutorService exec; + private final ExecutorService idleDetector; private final BlockingQueue<InputRow> buffer; private final InputRowParser<Map<String, Object>> parser; @@ -168,12 +182,29 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar private final AtomicLong bytesReceived = new AtomicLong(0); private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0); private final ConcurrentMap<String, Long> producerSequences = new ConcurrentHashMap<>(); + private final Stopwatch idleWatch = Stopwatch.createUnstarted(); public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser) { this.buffer = new ArrayBlockingQueue<>(bufferSize); this.parser = parser; exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d"); + idleDetector = Execs.singleThreaded("event-receiver-firehose-idle-detector-%d"); + idleDetector.submit(() -> { + long idled; + try { + while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < maxIdleTime) { + Thread.sleep(maxIdleTime - idled); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + log.info("Firehose has been idle for %d ms, closing.", idled); + close(); + }); + idleWatch.start(); } @POST @@ -185,6 +216,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar @Context final HttpServletRequest req ) { + idleWatch.reset(); + idleWatch.start(); Access accessResult = AuthorizationUtils.authorizeResourceAction( req, new ResourceAction( @@ -328,6 +361,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar chatHandlerProvider.get().unregister(serviceName); } exec.shutdown(); + idleDetector.shutdown(); + idleWatch.stop(); } } diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java new file mode 100644 index 0000000..18c289e --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import org.apache.druid.server.metrics.EventReceiverFirehoseRegister; +import org.apache.druid.server.security.AllowAllAuthenticator; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthTestUtils; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.servlet.http.HttpServletRequest; +import java.util.Locale; + +public class EventReceiverFirehostIdleTest +{ + private static final int CAPACITY = 300; + private static final long MAX_IDLE_TIME = 5_000L; + private static final String SERVICE_NAME = "test_firehose"; + + private final String inputRow = "[{\n" + + " \"timestamp\":123,\n" + + " \"d1\":\"v1\"\n" + + "}]"; + + private EventReceiverFirehoseFactory eventReceiverFirehoseFactory; + private EventReceiverFirehoseFactory.EventReceiverFirehose firehose; + private EventReceiverFirehoseRegister register = new EventReceiverFirehoseRegister(); + private HttpServletRequest req; + + @Before + public void setUp() throws Exception + { + req = EasyMock.createMock(HttpServletRequest.class); + eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory( + SERVICE_NAME, + CAPACITY, + MAX_IDLE_TIME, + null, + new DefaultObjectMapper(), + new DefaultObjectMapper(), + register, + AuthTestUtils.TEST_AUTHORIZER_MAPPER + ); + firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory.connect( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null), + null, + null + ) + ), + null + ); + } + + @Test(timeout = 40_000L) + public void testIdle() throws Exception + { + Thread.sleep(8_000L); + Assert.assertTrue(firehose.isClosed()); + } + + @Test(timeout = 40_000L) + public void testNotIdle() throws Exception + { + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)) + .andReturn(null) + .anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)) + .andReturn(null) + .anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT) + .anyTimes(); + EasyMock.expect(req.getHeader("X-Firehose-Producer-Id")).andReturn(null).anyTimes(); + EasyMock.expect(req.getContentType()).andReturn("application/json").anyTimes(); + req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(req); + + final int checks = 5; + for (int i = 0; i < checks; i++) { + Assert.assertFalse(firehose.isClosed()); + System.out.printf(Locale.ENGLISH, "Check %d/%d passed\n", i + 1, checks); + firehose.addAll(IOUtils.toInputStream(inputRow), req); + Thread.sleep(3_000L); + } + + Thread.sleep(5_000L); + Assert.assertTrue(firehose.isClosed()); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java index 62414e1..683c8ce 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java @@ -57,6 +57,7 @@ public class EventReceiverFirehoseTest { private static final int CAPACITY = 300; private static final int NUM_EVENTS = 100; + private static final long MAX_IDLE_TIME = Long.MAX_VALUE; private static final String SERVICE_NAME = "test_firehose"; private final String inputRow = "[{\n" @@ -76,6 +77,7 @@ public class EventReceiverFirehoseTest eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory( SERVICE_NAME, CAPACITY, + MAX_IDLE_TIME, null, new DefaultObjectMapper(), new DefaultObjectMapper(), @@ -217,6 +219,7 @@ public class EventReceiverFirehoseTest EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory( SERVICE_NAME, CAPACITY, + MAX_IDLE_TIME, null, new DefaultObjectMapper(), new DefaultObjectMapper(), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org