rdhabalia closed pull request #1241: Add REST api to check host-status for adding/removing from vip URL: https://github.com/apache/incubator-pulsar/pull/1241
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/proxy.conf b/conf/proxy.conf index d7c5afc4b..09394525e 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -38,6 +38,10 @@ webServicePort=8080 # Port to use to server HTTPS request webServicePortTls=8443 +# Path for the file used to determine the rotation status for the proxy-instance when responding +# to service discovery health checks +statusFilePath= + ### --- Authentication --- ### # Enable authentication diff --git a/conf/websocket.conf b/conf/websocket.conf index 404bdeff5..399efedb9 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -36,6 +36,10 @@ webServicePort=8080 # Port to use to server HTTPS request webServicePortTls=8443 +# Path for the file used to determine the rotation status for the proxy-instance when responding +# to service discovery health checks +statusFilePath= + # Hostname or IP address the service binds on, default is 0.0.0.0. bindAddress=0.0.0.0 diff --git a/pom.xml b/pom.xml index 192769fcf..eb97e0bae 100644 --- a/pom.xml +++ b/pom.xml @@ -374,6 +374,12 @@ flexible messaging model and an intuitive client API.</description> <artifactId>jersey-container-servlet</artifactId> <version>2.23.2</version> </dependency> + + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + <version>2.0.1</version> + </dependency> <dependency> <groupId>org.glassfish.jersey.media</groupId> diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 666fec85a..75edadbe8 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -48,5 +48,11 @@ <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> </dependency> + + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + </dependency> + </dependencies> </project> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java similarity index 64% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java rename to pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 1e2499c5b..5dfba9468 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.web; +package org.apache.pulsar.common.configuration; import java.io.File; +import javax.servlet.ServletContext; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.WebApplicationException; @@ -27,22 +28,28 @@ import javax.ws.rs.core.Response.Status; /** - * Web resource used by the VIP service to check to availability of the Pulsar broker instance. + * Web resource used by the VIP service to check to availability of the service instance. */ @Path("/status.html") -@NoSwaggerDocumentation -public class VipStatus extends PulsarWebResource { +public class VipStatus { + + public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; + + @Context + protected ServletContext servletContext; @GET @Context public String checkStatus() { - String statusFilePath = pulsar().getStatusFilePath(); - File statusFile = new File(statusFilePath); - if (statusFile.exists()) { - return "OK"; - } else { - throw new WebApplicationException(Status.NOT_FOUND); + String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); + if (statusFilePath != null) { + File statusFile = new File(statusFilePath); + if (statusFile.exists() && statusFile.isFile()) { + return "OK"; + } } + throw new WebApplicationException(Status.NOT_FOUND); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 9a2b44a74..10f19dbc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.URL; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -54,6 +55,7 @@ import org.apache.pulsar.broker.web.WebService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; @@ -76,6 +78,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import io.netty.util.concurrent.DefaultThreadFactory; @@ -274,13 +277,19 @@ public void start() throws PulsarServerException { brokerService.start(); this.webService = new WebService(this); - this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false); - this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true); - this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true); - this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true); + Map<String, Object> attributeMap = Maps.newHashMap(); + attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this); + Map<String, Object> vipAttributeMap = Maps.newHashMap(); + vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath()); + this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap); + this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap); + this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap); + this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap); + this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap); this.webService.addServlet("/metrics", - new ServletHolder(new PrometheusMetricsServlet(this, config.exposeTopicLevelMetricsInPrometheus())), false); + new ServletHolder(new PrometheusMetricsServlet(this, config.exposeTopicLevelMetricsInPrometheus())), + false, attributeMap); if (config.isWebSocketServiceEnabled()) { // Use local broker address to avoid different IP address when using a VIP for service discovery @@ -289,11 +298,11 @@ public void start() throws PulsarServerException { config); this.webSocketService.start(); this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH, - new ServletHolder(new WebSocketProducerServlet(webSocketService)), true); + new ServletHolder(new WebSocketProducerServlet(webSocketService)), true, attributeMap); this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH, - new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true); + new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true, attributeMap); this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH, - new ServletHolder(new WebSocketReaderServlet(webSocketService)), true); + new ServletHolder(new WebSocketReaderServlet(webSocketService)), true, attributeMap); } if (LOG.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index cb5584fb2..20ff44b92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -113,7 +114,7 @@ public WebService(PulsarService pulsar) throws PulsarServerException { server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } - public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication) { + public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication, Map<String,Object> attributeMap) { JacksonJaxbJsonProvider provider = new JacksonJaxbJsonProvider(); provider.setMapper(ObjectMapperFactory.create()); ResourceConfig config = new ResourceConfig(); @@ -121,14 +122,18 @@ public void addRestResources(String basePath, String javaPackages, boolean requi config.register(provider); ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); - addServlet(basePath, servletHolder, requiresAuthentication); + addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); } - public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication) { + public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, Map<String,Object> attributeMap) { ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(path); context.addServlet(servletHolder, MATCH_ALL); - context.setAttribute(WebService.ATTRIBUTE_PULSAR_NAME, pulsar); + if (attributeMap != null) { + attributeMap.forEach((key, value) -> { + context.setAttribute(key, value); + }); + } if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) { FilterHolder filter = new FilterHolder(new AuthenticationFilter(pulsar)); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 71022ea81..f94730571 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -49,6 +49,10 @@ private int webServicePort = 8080; // Port to use to server HTTPS request private int webServicePortTls = 8443; + + // Path for the file used to determine the rotation status for the broker + // when responding to service discovery health checks + private String statusFilePath; // Role names that are treated as "super-user", meaning they will be able to // do all admin operations and publish/consume from all topics @@ -176,6 +180,14 @@ public void setWebServicePortTls(int webServicePortTls) { this.webServicePortTls = webServicePortTls; } + public String getStatusFilePath() { + return statusFilePath; + } + + public void setStatusFilePath(String statusFilePath) { + this.statusFilePath = statusFilePath; + } + public boolean isTlsEnabledInProxy() { return tlsEnabledInProxy; } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index d9fc4af55..93067f8ac 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -35,6 +35,7 @@ import io.prometheus.client.exporter.MetricsServlet; import io.prometheus.client.hotspot.DefaultExports; +import org.apache.pulsar.common.configuration.VipStatus; /** * Starts an instance of the Pulsar ProxyService @@ -96,8 +97,8 @@ public ProxyServiceStarter(String[] args) throws Exception { java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); - // create broker service - ProxyService discoveryService = new ProxyService(config); + // create proxy service + ProxyService proxyService = new ProxyService(config); // create a web-service final WebServer server = new WebServer(config); @@ -105,7 +106,7 @@ public ProxyServiceStarter(String[] args) throws Exception { @Override public void run() { try { - discoveryService.close(); + proxyService.close(); server.stop(); } catch (Exception e) { log.warn("server couldn't stop gracefully {}", e.getMessage(), e); @@ -113,11 +114,13 @@ public void run() { } }); - discoveryService.start(); + proxyService.start(); // Setup metrics DefaultExports.initialize(); server.addServlet("/metrics", new ServletHolder(MetricsServlet.class)); + server.addRestResources("/", VipStatus.class.getPackage().getName(), + VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath()); // start web-service server.start(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index a809a3dd0..edc718828 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -27,6 +27,7 @@ import javax.net.ssl.SSLContext; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; @@ -40,9 +41,12 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider; import com.google.common.collect.Lists; import io.netty.util.concurrent.DefaultThreadFactory; @@ -99,6 +103,21 @@ public void addServlet(String path, ServletHolder servletHolder) { handlers.add(context); } + public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) { + JacksonJaxbJsonProvider provider = new JacksonJaxbJsonProvider(); + provider.setMapper(ObjectMapperFactory.create()); + ResourceConfig config = new ResourceConfig(); + config.packages("jersey.config.server.provider.packages", javaPackages); + config.register(provider); + ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); + servletHolder.setAsyncSupported(true); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath(basePath); + context.addServlet(servletHolder, "/*"); + context.setAttribute(attribute, attributeValue); + handlers.add(context); + } + public int getExternalServicePort() { return externalServicePort; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java index ff1bfe5eb..bf8846ec4 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.websocket.service; -import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ATTRIBUTE_PROXY_SERVICE_NAME; - import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -113,7 +111,7 @@ public void addWebSocketServlet(String basePath, Servlet socketServlet) handlers.add(context); } - public void addRestResources(String basePath, String javaPackages, WebSocketService service) { + public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) { JacksonJaxbJsonProvider provider = new JacksonJaxbJsonProvider(); provider.setMapper(ObjectMapperFactory.create()); ResourceConfig config = new ResourceConfig(); @@ -124,7 +122,7 @@ public void addRestResources(String basePath, String javaPackages, WebSocketServ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(basePath); context.addServlet(servletHolder, "/*"); - context.setAttribute(ATTRIBUTE_PROXY_SERVICE_NAME, service); + context.setAttribute(attribute, attributeValue); handlers.add(context); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 5cea3df73..8e266b8a5 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -45,6 +45,10 @@ private String serviceUrlTls; private String brokerServiceUrl; private String brokerServiceUrlTls; + + // Path for the file used to determine the rotation status for the broker + // when responding to service discovery health checks + private String statusFilePath; // Global Zookeeper quorum connection string private String globalZookeeperServers; @@ -143,6 +147,14 @@ public void setBrokerServiceUrlTls(String brokerServiceUrlTls) { this.brokerServiceUrlTls = brokerServiceUrlTls; } + public String getStatusFilePath() { + return statusFilePath; + } + + public void setStatusFilePath(String statusFilePath) { + this.statusFilePath = statusFilePath; + } + public String getGlobalZookeeperServers() { return globalZookeeperServers; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java index 690ad6a17..46720443f 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ADMIN_PATH; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; @@ -29,6 +30,7 @@ import org.apache.pulsar.websocket.admin.WebSocketProxyStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ATTRIBUTE_PROXY_SERVICE_NAME; public class WebSocketServiceStarter { @@ -53,7 +55,9 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); - proxyServer.addRestResources(ADMIN_PATH, WebSocketProxyStats.class.getPackage().getName(), service); + proxyServer.addRestResources(ADMIN_PATH, WebSocketProxyStats.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service); + proxyServer.addRestResources("/", VipStatus.class.getPackage().getName(), + VipStatus.ATTRIBUTE_STATUS_FILE_PATH, service.getConfig().getStatusFilePath()); proxyServer.start(); service.start(); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services