This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new adbf818  Change HTTP status code which WebSocket proxy returns to 
producer whe… (#1242)
adbf818 is described below

commit adbf818626988f4cd3cf008462d173b440d4c1ba
Author: massakam <massa...@yahoo-corp.jp>
AuthorDate: Sat Feb 24 07:54:07 2018 +0900

    Change HTTP status code which WebSocket proxy returns to producer whe… 
(#1242)
    
    * Change HTTP status code which WebSocket proxy returns to producer when 
backlog exceeds threshold
    
    * Fix WebSocket proxy to return exception message to client
    
    * Add some tests for WebSocket
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 206 +++++++++++++++++----
 .../pulsar/websocket/AbstractWebSocketHandler.java |   5 +-
 .../apache/pulsar/websocket/ConsumerHandler.java   |  39 ++--
 .../apache/pulsar/websocket/ProducerHandler.java   |   6 +-
 .../org/apache/pulsar/websocket/ReaderHandler.java |   5 +-
 5 files changed, 207 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index c25c947..3c40173 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Invocation;
@@ -39,6 +40,7 @@ import javax.ws.rs.core.Response;
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.websocket.WebSocketService;
 import org.apache.pulsar.websocket.service.ProxyServer;
@@ -70,8 +72,12 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
     private ProxyServer proxyServer;
     private WebSocketService service;
 
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
     @BeforeMethod
     public void setup() throws Exception {
+        
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
         super.internalSetup();
         super.producerBaseSetup();
 
@@ -89,6 +95,7 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
 
     @AfterMethod
     protected void cleanup() throws Exception {
+        super.resetConfig();
         super.internalCleanup();
         service.close();
         proxyServer.stop();
@@ -97,7 +104,7 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 10000)
     public void socketTest() throws Exception {
-        String consumerUri = "ws://localhost:" + port
+        final String consumerUri = "ws://localhost:" + port
                 + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
         String readerUri = "ws://localhost:" + port + 
"/ws/reader/persistent/my-property/use/my-ns/my-topic1";
         String producerUri = "ws://localhost:" + port + 
"/ws/producer/persistent/my-property/use/my-ns/my-topic1/";
@@ -167,32 +174,16 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
             }
             Assert.assertEquals(produceSocket.getBuffer(), 
readSocket.getBuffer());
         } finally {
-            ExecutorService executor = newFixedThreadPool(1);
-            try {
-                executor.submit(() -> {
-                    try {
-                        consumeClient1.stop();
-                        consumeClient2.stop();
-                        readClient.stop();
-                        produceClient.stop();
-                        log.info("proxy clients are stopped successfully");
-                    } catch (Exception e) {
-                        log.error(e.getMessage());
-                    }
-                }).get(2, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.error("failed to close clients ", e);
-            }
-            executor.shutdownNow();
+            stopWebSocketClient(consumeClient1, consumeClient2, readClient, 
produceClient);
         }
     }
 
     @Test(timeOut = 10000)
-    public void badConsumerTest() throws Exception {
+    public void emptySubcriptionConsumerTest() throws Exception {
 
         // Empty subcription name
-        String consumerUri = "ws://localhost:" + port
-                + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive";
+        final String consumerUri = "ws://localhost:" + port
+                + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic2/?subscriptionType=Exclusive";
         URI consumeUri = URI.create(consumerUri);
 
         WebSocketClient consumeClient1 = new WebSocketClient();
@@ -207,21 +198,148 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
         } catch (Exception e) {
             // Expected
             Assert.assertTrue(e.getCause() instanceof UpgradeException);
+            Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                    HttpServletResponse.SC_BAD_REQUEST);
         } finally {
-            ExecutorService executor = newFixedThreadPool(1);
+            stopWebSocketClient(consumeClient1);
+        }
+    }
+
+    @Test(timeOut = 10000)
+    public void conflictingConsumerTest() throws Exception {
+        final String consumerUri = "ws://localhost:" + port
+                + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
+        URI consumeUri = URI.create(consumerUri);
+
+        WebSocketClient consumeClient1 = new WebSocketClient();
+        WebSocketClient consumeClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+
+        try {
+            consumeClient1.start();
+            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+            Future<Session> consumerFuture1 = 
consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+            consumerFuture1.get();
+
             try {
-                executor.submit(() -> {
-                    try {
-                        consumeClient1.stop();
-                        log.info("proxy clients are stopped successfully");
-                    } catch (Exception e) {
-                        log.error(e.getMessage());
-                    }
-                }).get(2, TimeUnit.SECONDS);
+                consumeClient2.start();
+                ClientUpgradeRequest consumeRequest2 = new 
ClientUpgradeRequest();
+                Future<Session> consumerFuture2 = 
consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+                consumerFuture2.get();
+                Assert.fail("should fail: conflicting subscription name");
             } catch (Exception e) {
-                log.error("failed to close clients ", e);
+                // Expected
+                Assert.assertTrue(e.getCause() instanceof UpgradeException);
+                Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                        HttpServletResponse.SC_CONFLICT);
+            } finally {
+                stopWebSocketClient(consumeClient2);
             }
-            executor.shutdownNow();
+        } finally {
+            stopWebSocketClient(consumeClient1);
+        }
+    }
+
+    @Test(timeOut = 10000)
+    public void conflictingProducerTest() throws Exception {
+        final String producerUri = "ws://localhost:" + port
+                + 
"/ws/producer/persistent/my-property/use/my-ns/my-topic4?producerName=my-producer";
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient produceClient1 = new WebSocketClient();
+        WebSocketClient produceClient2 = new WebSocketClient();
+        SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
+        SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
+
+        try {
+            produceClient1.start();
+            ClientUpgradeRequest produceRequest1 = new ClientUpgradeRequest();
+            Future<Session> producerFuture1 = 
produceClient1.connect(produceSocket1, produceUri, produceRequest1);
+            producerFuture1.get();
+
+            try {
+                produceClient2.start();
+                ClientUpgradeRequest produceRequest2 = new 
ClientUpgradeRequest();
+                Future<Session> producerFuture2 = 
produceClient2.connect(produceSocket2, produceUri, produceRequest2);
+                producerFuture2.get();
+                Assert.fail("should fail: conflicting producer name");
+            } catch (Exception e) {
+                // Expected
+                Assert.assertTrue(e.getCause() instanceof UpgradeException);
+                Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                        HttpServletResponse.SC_CONFLICT);
+            } finally {
+                stopWebSocketClient(produceClient2);
+            }
+        } finally {
+            stopWebSocketClient(produceClient1);
+        }
+    }
+
+    @Test(timeOut = 30000)
+    public void producerBacklogQuotaExceededTest() throws Exception {
+        admin.namespaces().createNamespace("my-property/use/ns-ws-quota");
+        admin.namespaces().setBacklogQuota("my-property/use/ns-ws-quota",
+                new BacklogQuota(10, 
BacklogQuota.RetentionPolicy.producer_request_hold));
+
+        final String topic = "my-property/use/ns-ws-quota/my-topic5";
+        final String subscription = "my-sub";
+        final String consumerUri = "ws://localhost:" + port + 
"/ws/consumer/persistent/" + topic + "/" + subscription;
+        final String producerUri = "ws://localhost:" + port + 
"/ws/producer/persistent/" + topic;
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient = new WebSocketClient();
+        WebSocketClient produceClient1 = new WebSocketClient();
+        WebSocketClient produceClient2 = new WebSocketClient();
+
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
+        SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
+
+        // Create subscription
+        try {
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = 
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            consumerFuture.get();
+        } finally {
+            stopWebSocketClient(consumeClient);
+        }
+
+        // Fill the backlog
+        try {
+            produceClient1.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient1.connect(produceSocket1, produceUri, produceRequest);
+            producerFuture.get();
+            produceSocket1.sendMessage(100);
+        } finally {
+            stopWebSocketClient(produceClient1);
+        }
+
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+
+        // New producer fails to connect
+        try {
+            produceClient2.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient2.connect(produceSocket2, produceUri, produceRequest);
+            producerFuture.get();
+            Assert.fail("should fail: backlog quota exceeded");
+        } catch (Exception e) {
+            // Expected
+            Assert.assertTrue(e.getCause() instanceof UpgradeException);
+            Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                    HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+        } finally {
+            stopWebSocketClient(produceClient2);
+            admin.persistentTopics().skipAllMessages("persistent://" + topic, 
subscription);
+            admin.persistentTopics().delete("persistent://" + topic);
+            
admin.namespaces().removeBacklogQuota("my-property/use/ns-ws-quota");
+            admin.namespaces().deleteNamespace("my-property/use/ns-ws-quota");
         }
     }
 
@@ -232,7 +350,7 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
      */
     @Test(timeOut = 10000)
     public void testProxyStats() throws Exception {
-        final String topic = "my-property/use/my-ns/my-topic2";
+        final String topic = "my-property/use/my-ns/my-topic6";
         final String consumerUri = "ws://localhost:" + port + 
"/ws/consumer/persistent/" + topic
                 + "/my-sub?subscriptionType=Failover";
         final String producerUri = "ws://localhost:" + port + 
"/ws/producer/persistent/" + topic + "/";
@@ -299,9 +417,7 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
             verifyTopicStat(client, baseUrl, topic);
 
         } finally {
-            consumeClient1.stop();
-            produceClient.stop();
-            log.info("proxy clients are stopped successfully");
+            stopWebSocketClient(consumeClient1, produceClient);
         }
     }
 
@@ -361,5 +477,23 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
         Assert.assertNotNull(producerStats.remoteConnection);
     }
 
+    private void stopWebSocketClient(WebSocketClient... clients) {
+        ExecutorService executor = newFixedThreadPool(1);
+        try {
+            executor.submit(() -> {
+                try {
+                    for (WebSocketClient client : clients) {
+                        client.stop();
+                    }
+                    log.info("proxy clients are stopped successfully");
+                } catch (Exception e) {
+                    log.error(e.getMessage());
+                }
+            }).get(2, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("failed to close proxy clients", e);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
 }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 91e62b7..ad6f910 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -48,7 +48,6 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
 
     protected final String topic;
     protected final Map<String, String> queryParams;
-    protected final boolean authResult;
 
     public AbstractWebSocketHandler(WebSocketService service, 
HttpServletRequest request, ServletUpgradeResponse response) {
         this.service = service;
@@ -59,11 +58,9 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         request.getParameterMap().forEach((key, values) -> {
             queryParams.put(key, values[0]);
         });
-
-        authResult = checkAuth(response);
     }
 
-    private boolean checkAuth(ServletUpgradeResponse response) {
+    protected boolean checkAuth(ServletUpgradeResponse response) {
         String authRole = "<none>";
         AuthenticationDataSource authenticationData = new 
AuthenticationDataHttps(request);
         if (service.isAuthenticationEnabled()) {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index b392e0c..6edcf2a 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -35,6 +35,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.MessageId;
+import 
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -64,7 +65,7 @@ import com.google.common.base.Splitter;
  */
 public class ConsumerHandler extends AbstractWebSocketHandler {
 
-    private final String subscription;
+    private String subscription = null;
     private final ConsumerConfiguration conf;
     private Consumer consumer;
 
@@ -80,18 +81,19 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
 
     public ConsumerHandler(WebSocketService service, HttpServletRequest 
request, ServletUpgradeResponse response) {
         super(service, request, response);
-        this.subscription = extractSubscription(request);
         this.conf = getConsumerConfiguration();
         this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : 
conf.getReceiverQueueSize();
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
         this.numMsgsAcked = new LongAdder();
 
-        if (!authResult) {
-            return;
-        }
-
         try {
+            // checkAuth() should be called after assigning a value to 
this.subscription
+            this.subscription = extractSubscription(request);
+            if (!checkAuth(response)) {
+                return;
+            }
+
             this.consumer = service.getPulsarClient().subscribe(topic, 
subscription, conf);
             if (!this.service.addConsumer(this)) {
                 log.warn("[{}:{}] Failed to add consumer handler for topic 
{}", request.getRemoteAddr(),
@@ -100,12 +102,9 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
         } catch (Exception e) {
             log.warn("[{}:{}] Failed in creating subscription {} on topic {}", 
request.getRemoteAddr(),
                     request.getRemotePort(), subscription, topic, e);
-            boolean configError = e instanceof IllegalArgumentException;
-            int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST
-                    : HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
-            String errorMsg = configError ? "Invalid query-param " + 
e.getMessage() : "Failed to subscribe";
+
             try {
-                response.sendError(errorCode, errorMsg);
+                response.sendError(getErrorCode(e), getErrorMessage(e));
             } catch (IOException e1) {
                 log.warn("[{}:{}] Failed to send error: {}", 
request.getRemoteAddr(), request.getRemotePort(),
                         e1.getMessage(), e1);
@@ -113,6 +112,24 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
         }
     }
 
+    private static int getErrorCode(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return HttpServletResponse.SC_BAD_REQUEST;
+        } else if (e instanceof ConsumerBusyException) {
+            return HttpServletResponse.SC_CONFLICT;
+        } else {
+            return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+        }
+    }
+
+    private static String getErrorMessage(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return "Invalid query params: " + e.getMessage();
+        } else {
+            return "Failed to subscribe: " + e.getMessage();
+        }
+    }
+
     private void receiveMessage() {
         if (log.isDebugEnabled()) {
             log.debug("[{}:{}] [{}] [{}] Receive next message", 
request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index cb2e288..7df14a5 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -85,7 +87,7 @@ public class ProducerHandler extends AbstractWebSocketHandler 
{
         this.numMsgsFailed = new LongAdder();
         this.publishLatencyStatsUSec = new 
StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
 
-        if (!authResult) {
+        if (!checkAuth(response)) {
             return;
         }
 
@@ -114,6 +116,8 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
             return HttpServletResponse.SC_BAD_REQUEST;
         } else if (e instanceof ProducerBusyException) {
             return HttpServletResponse.SC_CONFLICT;
+        } else if (e instanceof ProducerBlockedQuotaExceededError || e 
instanceof ProducerBlockedQuotaExceededException) {
+            return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
         } else {
             return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
         }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 4d6c271..d643df2 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -82,7 +82,7 @@ public class ReaderHandler extends AbstractWebSocketHandler {
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
 
-        if (!authResult) {
+        if (!checkAuth(response)) {
             return;
         }
 
@@ -97,7 +97,8 @@ public class ReaderHandler extends AbstractWebSocketHandler {
             log.warn("[{}:{}] Failed in creating reader {} on topic {}", 
request.getRemoteAddr(),
                     request.getRemotePort(), subscription, topic, e);
             try {
-                
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to 
create reader");
+                
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                        "Failed to create reader: " + e.getMessage());
             } catch (IOException e1) {
                 log.warn("[{}:{}] Failed to send error: {}", 
request.getRemoteAddr(), request.getRemotePort(),
                         e1.getMessage(), e1);

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to