This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f4d6c2b Pulsar Proxy - Added Prometheus metrics for throttled connections and lookups. (#1472) f4d6c2b is described below commit f4d6c2bf450c90c4f2e291607dc3f887adc4a1a5 Author: Jai Asher <j...@ccs.neu.edu> AuthorDate: Thu Mar 29 15:54:10 2018 -0700 Pulsar Proxy - Added Prometheus metrics for throttled connections and lookups. (#1472) --- .../pulsar/proxy/server/LookupProxyHandler.java | 23 ++++++++++++++++++---- .../pulsar/proxy/server/ProxyConnection.java | 10 +++++----- .../server/ProxyConnectionThrottlingTest.java | 2 ++ .../proxy/server/ProxyLookupThrottlingTest.java | 3 ++- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index aad42df..bbf1c44 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -38,6 +38,7 @@ import io.prometheus.client.Counter; import static org.apache.commons.lang3.StringUtils.isBlank; public class LookupProxyHandler { + private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests"; private final ProxyService service; private final ProxyConnection proxyConnection; private final boolean connectWithTLS; @@ -52,6 +53,14 @@ public class LookupProxyHandler { .build("pulsar_proxy_partitions_metadata_requests", "Counter of partitions metadata requests").create() .register(); + static final Counter rejectedLookupRequests = Counter.build("pulsar_proxy_rejected_lookup_requests", + "Counter of topic lookup requests rejected due to throttling").create().register(); + + static final Counter rejectedPartitionsMetadataRequests = Counter + .build("pulsar_proxy_rejected_partitions_metadata_requests", + "Counter of partitions metadata requests rejected due to throttling") + .create().register(); + public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) { this.service = proxy; this.proxyConnection = proxyConnection; @@ -89,11 +98,13 @@ public class LookupProxyHandler { performLookup(clientRequestId, topic, serviceUrl, false, 10); this.service.getLookupRequestSemaphore().release(); } else { + rejectedLookupRequests.inc(); if (log.isDebugEnabled()) { - log.debug("Request ID {} from {} rejected - Too many concurrent lookup requests.", clientRequestId, clientAddress); + log.debug("Lookup Request ID {} from {} rejected - {}.", clientRequestId, clientAddress, + throttlingErrorMessage); } proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, - "Too many concurrent lookup requests", clientRequestId)); + throttlingErrorMessage, clientRequestId)); } } @@ -137,7 +148,6 @@ public class LookupProxyHandler { performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1); } else { // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS - // between proxy // and broker is independent of whether the client itself uses TLS, but we need to force the // client // to use the appropriate target broker (and port) when it will connect back. @@ -168,8 +178,13 @@ public class LookupProxyHandler { handlePartitionMetadataResponse(partitionMetadata, clientRequestId); this.service.getLookupRequestSemaphore().release(); } else { + rejectedPartitionsMetadataRequests.inc(); + if (log.isDebugEnabled()) { + log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", clientRequestId, clientAddress, + throttlingErrorMessage); + } proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, - "Too many concurrent lookup requests", clientRequestId)); + throttlingErrorMessage, clientRequestId)); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index e65461a..d2c7f97 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -40,8 +40,6 @@ import org.slf4j.LoggerFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandler; -import io.netty.channel.ChannelPipeline; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -87,6 +85,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create() .register(); + static final Counter rejectedConnections = Counter + .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create() + .register(); + public ProxyConnection(ProxyService proxyService) { super(30, TimeUnit.SECONDS); this.service = proxyService; @@ -98,10 +100,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi super.channelRegistered(ctx); activeConnections.inc(); if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Too many connection opened {}", remoteAddress, activeConnections.get()); - } ctx.close(); + rejectedConnections.inc(); return; } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 008e751..21c88c6 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -74,6 +74,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) .build(); Producer<byte[]> producer2; + Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d); try { producer2 = client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create(); producer2.send("Message 1".getBytes()); @@ -81,6 +82,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { } catch (Exception ex) { // OK } + Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d); } private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index c661cae..4411f80 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -78,7 +78,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { } catch (Exception ex) { // Ignore } - + Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 1.0d); proxyService.getLookupRequestSemaphore().release(); try { Producer<byte[]> producer3 = client.newProducer().topic("persistent://sample/test/local/producer-topic") @@ -86,6 +86,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { } catch (Exception ex) { Assert.fail("Should not have failed since can acquire LookupRequestSemaphore"); } + Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 1.0d); client.close(); } } -- To stop receiving notification emails like this one, please contact mme...@apache.org.