This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 07d4226  [proxy][functions] Issue #2154: proxy should be able to 
forward rest requests to function workers cluster (#2560)
07d4226 is described below

commit 07d42261eb2d685de6f9a5ab214f4d08944ffb6e
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Fri Sep 14 01:10:35 2018 -0700

    [proxy][functions] Issue #2154: proxy should be able to forward rest 
requests to function workers cluster (#2560)
    
    *Motivation*
    
    Function workers can be deployed as a separate cluster. If so, proxy is not 
able to forward the functions
    related rest calls to the correct server.
    
    *Changes*
    
    Add two settings in proxy configuration to allow proxy configuring 
forwarding functions related rest calls
    to function worker cluster.
    
    *Tests*
    
    Verified with changes in integration tests (manually). It is hard to add 
the integration tests based on
    current integration tests. will add them in a separate PR.
---
 conf/proxy.conf                                          |  5 +++++
 .../apache/pulsar/proxy/server/AdminProxyHandler.java    | 16 ++++++++++++++--
 .../apache/pulsar/proxy/server/ProxyConfiguration.java   | 12 ++++++++++++
 3 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 9b307cc..b95c4d3 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -23,6 +23,11 @@ zookeeperServers=
 # Configuration store connection string (as a comma-separated list)
 configurationStoreServers=
 
+# If function workers are setup in a separate cluster, configure the following 
2 settings
+# to point to the function workers cluster
+functionWorkerWebServiceURL=
+functionWorkerWebServiceURLTLS=
+
 # ZooKeeper session timeout (in milliseconds)
 zookeeperSessionTimeoutMs=30000
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 49c789c..d6c32df 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -49,12 +49,15 @@ class AdminProxyHandler extends AsyncProxyServlet {
     private final ProxyConfiguration config;
     private final BrokerDiscoveryProvider discoveryProvider;
     private final String brokerWebServiceUrl;
+    private final String functionWorkerWebServiceUrl;
 
     AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider) {
         this.config = config;
         this.discoveryProvider = discoveryProvider;
         this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? 
config.getBrokerWebServiceURLTLS()
                 : config.getBrokerWebServiceURL();
+        this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? 
config.getFunctionWorkerWebServiceURLTLS()
+                : config.getFunctionWorkerWebServiceURL();
     }
 
     @Override
@@ -122,7 +125,16 @@ class AdminProxyHandler extends AsyncProxyServlet {
     protected String rewriteTarget(HttpServletRequest request) {
         StringBuilder url = new StringBuilder();
 
-        if (isBlank(brokerWebServiceUrl)) {
+        boolean isFunctionsRestRequest = false;
+        String requestUri = request.getRequestURI();
+        if (requestUri.startsWith("/admin/v2/functions")
+            || requestUri.startsWith("/admin/functions")) {
+            isFunctionsRestRequest = true;
+        }
+
+        if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
+            url.append(functionWorkerWebServiceUrl);
+        } else if (isBlank(brokerWebServiceUrl)) {
             try {
                 ServiceLookupData availableBroker = 
discoveryProvider.nextBroker();
 
@@ -148,7 +160,7 @@ class AdminProxyHandler extends AsyncProxyServlet {
         if (url.lastIndexOf("/") == url.length() - 1) {
             url.deleteCharAt(url.lastIndexOf("/"));
         }
-        url.append(request.getRequestURI());
+        url.append(requestUri);
 
         String query = request.getQueryString();
         if (query != null) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index b4e8afb..0155baa 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -48,6 +48,10 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     private String brokerWebServiceURL;
     private String brokerWebServiceURLTLS;
 
+    // function worker web services
+    private String functionWorkerWebServiceURL;
+    private String functionWorkerWebServiceURLTLS;
+
     // Port to use to server binary-proto request
     private int servicePort = 6650;
     // Port to use to server binary-proto-tls request
@@ -158,6 +162,14 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
         this.brokerWebServiceURLTLS = brokerWebServiceURLTLS;
     }
 
+    public String getFunctionWorkerWebServiceURL() {
+        return functionWorkerWebServiceURL;
+    }
+
+    public String getFunctionWorkerWebServiceURLTLS() {
+        return functionWorkerWebServiceURLTLS;
+    }
+
     public String getZookeeperServers() {
         return zookeeperServers;
     }

Reply via email to