This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f4d6c2b  Pulsar Proxy - Added Prometheus metrics for throttled 
connections and lookups. (#1472)
f4d6c2b is described below

commit f4d6c2bf450c90c4f2e291607dc3f887adc4a1a5
Author: Jai Asher <j...@ccs.neu.edu>
AuthorDate: Thu Mar 29 15:54:10 2018 -0700

    Pulsar Proxy - Added Prometheus metrics for throttled connections and 
lookups. (#1472)
---
 .../pulsar/proxy/server/LookupProxyHandler.java    | 23 ++++++++++++++++++----
 .../pulsar/proxy/server/ProxyConnection.java       | 10 +++++-----
 .../server/ProxyConnectionThrottlingTest.java      |  2 ++
 .../proxy/server/ProxyLookupThrottlingTest.java    |  3 ++-
 4 files changed, 28 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index aad42df..bbf1c44 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -38,6 +38,7 @@ import io.prometheus.client.Counter;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 public class LookupProxyHandler {
+    private final String throttlingErrorMessage = "Too many concurrent lookup 
and partitionsMetadata requests";
     private final ProxyService service;
     private final ProxyConnection proxyConnection;
     private final boolean connectWithTLS;
@@ -52,6 +53,14 @@ public class LookupProxyHandler {
             .build("pulsar_proxy_partitions_metadata_requests", "Counter of 
partitions metadata requests").create()
             .register();
 
+    static final Counter rejectedLookupRequests = 
Counter.build("pulsar_proxy_rejected_lookup_requests",
+            "Counter of topic lookup requests rejected due to 
throttling").create().register();
+
+    static final Counter rejectedPartitionsMetadataRequests = Counter
+            .build("pulsar_proxy_rejected_partitions_metadata_requests",
+                    "Counter of partitions metadata requests rejected due to 
throttling")
+            .create().register();
+
     public LookupProxyHandler(ProxyService proxy, ProxyConnection 
proxyConnection) {
         this.service = proxy;
         this.proxyConnection = proxyConnection;
@@ -89,11 +98,13 @@ public class LookupProxyHandler {
             performLookup(clientRequestId, topic, serviceUrl, false, 10);
             this.service.getLookupRequestSemaphore().release();
         } else {
+            rejectedLookupRequests.inc();
             if (log.isDebugEnabled()) {
-                log.debug("Request ID {} from {} rejected - Too many 
concurrent lookup requests.", clientRequestId, clientAddress);
+                log.debug("Lookup Request ID {} from {} rejected - {}.", 
clientRequestId, clientAddress,
+                        throttlingErrorMessage);
             }
             
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
-                    "Too many concurrent lookup requests", clientRequestId));
+                    throttlingErrorMessage, clientRequestId));
         }
 
     }
@@ -137,7 +148,6 @@ public class LookupProxyHandler {
                     performLookup(clientRequestId, topic, brokerUrl, 
result.authoritative, numberOfRetries - 1);
                 } else {
                     // Reply the same address for both TLS non-TLS. The reason 
is that whether we use TLS
-                    // between proxy
                     // and broker is independent of whether the client itself 
uses TLS, but we need to force the
                     // client
                     // to use the appropriate target broker (and port) when it 
will connect back.
@@ -168,8 +178,13 @@ public class LookupProxyHandler {
             handlePartitionMetadataResponse(partitionMetadata, 
clientRequestId);
             this.service.getLookupRequestSemaphore().release();
         } else {
+            rejectedPartitionsMetadataRequests.inc();
+            if (log.isDebugEnabled()) {
+                log.debug("PartitionMetaData Request ID {} from {} rejected - 
{}.", clientRequestId, clientAddress,
+                        throttlingErrorMessage);
+            }
             
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
-                    "Too many concurrent lookup requests", clientRequestId));
+                    throttlingErrorMessage, clientRequestId));
         }
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e65461a..d2c7f97 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,8 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -87,6 +85,10 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             .build("pulsar_proxy_new_connections", "Counter of connections 
being opened in the proxy").create()
             .register();
 
+    static final Counter rejectedConnections = Counter
+            .build("pulsar_proxy_rejected_connections", "Counter for 
connections rejected due to throttling").create()
+            .register();
+    
     public ProxyConnection(ProxyService proxyService) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
@@ -98,10 +100,8 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         super.channelRegistered(ctx);
         activeConnections.inc();
         if (activeConnections.get() > 
service.getConfiguration().getMaxConcurrentInboundConnections()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[{}] Too many connection opened {}", remoteAddress, 
activeConnections.get());
-            }
             ctx.close();
+            rejectedConnections.inc();
             return;
         }
     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 008e751..21c88c6 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -74,6 +74,7 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
         PulsarClient client2 = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
                 .build();
         Producer<byte[]> producer2;
+        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
         try {
             producer2 = 
client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
             producer2.send("Message 1".getBytes());
@@ -81,6 +82,7 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
         } catch (Exception ex) {
             // OK
         }
+        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
     }
     
     private static final Logger LOG = 
LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index c661cae..4411f80 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -78,7 +78,7 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
         } catch (Exception ex) {
             // Ignore
         }
-
+        
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
 1.0d);
         proxyService.getLookupRequestSemaphore().release();
         try {
             Producer<byte[]> producer3 = 
client.newProducer().topic("persistent://sample/test/local/producer-topic")
@@ -86,6 +86,7 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
         } catch (Exception ex) {
             Assert.fail("Should not have failed since can acquire 
LookupRequestSemaphore");
         }
+        
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
 1.0d);
         client.close();
     }
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to