This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 07d4226 [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster (#2560) 07d4226 is described below commit 07d42261eb2d685de6f9a5ab214f4d08944ffb6e Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Fri Sep 14 01:10:35 2018 -0700 [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster (#2560) *Motivation* Function workers can be deployed as a separate cluster. If so, proxy is not able to forward the functions related rest calls to the correct server. *Changes* Add two settings in proxy configuration to allow proxy configuring forwarding functions related rest calls to function worker cluster. *Tests* Verified with changes in integration tests (manually). It is hard to add the integration tests based on current integration tests. will add them in a separate PR. --- conf/proxy.conf | 5 +++++ .../apache/pulsar/proxy/server/AdminProxyHandler.java | 16 ++++++++++++++-- .../apache/pulsar/proxy/server/ProxyConfiguration.java | 12 ++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/conf/proxy.conf b/conf/proxy.conf index 9b307cc..b95c4d3 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -23,6 +23,11 @@ zookeeperServers= # Configuration store connection string (as a comma-separated list) configurationStoreServers= +# If function workers are setup in a separate cluster, configure the following 2 settings +# to point to the function workers cluster +functionWorkerWebServiceURL= +functionWorkerWebServiceURLTLS= + # ZooKeeper session timeout (in milliseconds) zookeeperSessionTimeoutMs=30000 diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 49c789c..d6c32df 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -49,12 +49,15 @@ class AdminProxyHandler extends AsyncProxyServlet { private final ProxyConfiguration config; private final BrokerDiscoveryProvider discoveryProvider; private final String brokerWebServiceUrl; + private final String functionWorkerWebServiceUrl; AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) { this.config = config; this.discoveryProvider = discoveryProvider; this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS() : config.getBrokerWebServiceURL(); + this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS() + : config.getFunctionWorkerWebServiceURL(); } @Override @@ -122,7 +125,16 @@ class AdminProxyHandler extends AsyncProxyServlet { protected String rewriteTarget(HttpServletRequest request) { StringBuilder url = new StringBuilder(); - if (isBlank(brokerWebServiceUrl)) { + boolean isFunctionsRestRequest = false; + String requestUri = request.getRequestURI(); + if (requestUri.startsWith("/admin/v2/functions") + || requestUri.startsWith("/admin/functions")) { + isFunctionsRestRequest = true; + } + + if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) { + url.append(functionWorkerWebServiceUrl); + } else if (isBlank(brokerWebServiceUrl)) { try { ServiceLookupData availableBroker = discoveryProvider.nextBroker(); @@ -148,7 +160,7 @@ class AdminProxyHandler extends AsyncProxyServlet { if (url.lastIndexOf("/") == url.length() - 1) { url.deleteCharAt(url.lastIndexOf("/")); } - url.append(request.getRequestURI()); + url.append(requestUri); String query = request.getQueryString(); if (query != null) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index b4e8afb..0155baa 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -48,6 +48,10 @@ public class ProxyConfiguration implements PulsarConfiguration { private String brokerWebServiceURL; private String brokerWebServiceURLTLS; + // function worker web services + private String functionWorkerWebServiceURL; + private String functionWorkerWebServiceURLTLS; + // Port to use to server binary-proto request private int servicePort = 6650; // Port to use to server binary-proto-tls request @@ -158,6 +162,14 @@ public class ProxyConfiguration implements PulsarConfiguration { this.brokerWebServiceURLTLS = brokerWebServiceURLTLS; } + public String getFunctionWorkerWebServiceURL() { + return functionWorkerWebServiceURL; + } + + public String getFunctionWorkerWebServiceURLTLS() { + return functionWorkerWebServiceURLTLS; + } + public String getZookeeperServers() { return zookeeperServers; }