This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new a569a81a KAFKA-8304: Fix registration of Connect REST extensions (#6651) a569a81a is described below commit a569a81a6c6cce47261200fe1b0e75085b4e409d Author: Chris Egerton <chr...@confluent.io> AuthorDate: Tue May 7 15:20:51 2019 -0700 KAFKA-8304: Fix registration of Connect REST extensions (#6651) Fix registration of Connect REST extensions to prevent deadlocks when extensions get the list of connectors before the herder is available. Added integration test to check the behavior. Author: Chris Egerton <ceger...@oberlin.edu> Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rha...@gmail.com> --- checkstyle/import-control.xml | 1 + .../kafka/connect/cli/ConnectDistributed.java | 6 +- .../kafka/connect/cli/ConnectStandalone.java | 6 +- .../org/apache/kafka/connect/runtime/Connect.java | 1 + .../kafka/connect/runtime/HerderProvider.java | 68 ------------ .../runtime/health/ConnectClusterStateImpl.java | 12 +-- .../kafka/connect/runtime/rest/RestServer.java | 66 +++++++----- .../rest/resources/ConnectorPluginsResource.java | 12 +-- .../runtime/rest/resources/ConnectorsResource.java | 39 +++---- .../runtime/rest/resources/RootResource.java | 8 +- .../integration/RestExtensionIntegrationTest.java | 119 +++++++++++++++++++++ .../health/ConnectClusterStateImplTest.java | 5 +- .../kafka/connect/runtime/rest/RestServerTest.java | 10 +- .../resources/ConnectorPluginsResourceTest.java | 3 +- .../rest/resources/ConnectorsResourceTest.java | 3 +- .../runtime/rest/resources/RootResourceTest.java | 3 +- .../util/clusters/EmbeddedConnectCluster.java | 2 +- 17 files changed, 206 insertions(+), 158 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 072d706..f4955ce 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -381,6 +381,7 @@ <allow pkg="org.apache.kafka.connect.util.clusters" /> <allow pkg="org.apache.kafka.connect" /> <allow pkg="org.apache.kafka.tools" /> + <allow pkg="javax.ws.rs" /> </subpackage> <subpackage name="json"> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index a6c6d98..17d65ac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Connect; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.WorkerInfo; @@ -95,8 +94,7 @@ public class ConnectDistributed { log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); - HerderProvider provider = new HerderProvider(); - rest.start(provider, plugins); + rest.initializeServer(); URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); @@ -124,8 +122,6 @@ public class ConnectDistributed { log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); try { connect.start(); - // herder has initialized now, and ready to be used by the RestServer. - provider.setHerder(herder); } catch (Exception e) { log.error("Failed to start Connect", e); connect.stop(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java index dd1cf0f..499e6df 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Connect; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerInfo; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -83,8 +82,7 @@ public class ConnectStandalone { log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); - HerderProvider provider = new HerderProvider(); - rest.start(provider, plugins); + rest.initializeServer(); URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); @@ -97,8 +95,6 @@ public class ConnectStandalone { try { connect.start(); - // herder has initialized now, and ready to be used by the RestServer. - provider.setHerder(herder); for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) { Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile)); FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java index 965046c..4a0bcab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java @@ -51,6 +51,7 @@ public class Connect { Runtime.getRuntime().addShutdownHook(shutdownHook); herder.start(); + rest.initializeResources(herder); log.info("Kafka Connect started"); } finally { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java deleted file mode 100644 index 42c0925..0000000 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.connect.runtime; - -import org.apache.kafka.connect.errors.ConnectException; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * A supplier for {@link Herder}s. - */ -public class HerderProvider { - - private final CountDownLatch initialized = new CountDownLatch(1); - volatile Herder herder = null; - - public HerderProvider() { - } - - /** - * Create a herder provider with a herder. - * @param herder the herder that will be supplied to threads waiting on this provider - */ - public HerderProvider(Herder herder) { - this.herder = herder; - initialized.countDown(); - } - - /** - * @return the contained herder. - * @throws ConnectException if a herder was not available within a duration of calling this method - */ - public Herder get() { - try { - // wait for herder to be initialized - if (!initialized.await(1, TimeUnit.MINUTES)) { - throw new ConnectException("Timed out waiting for herder to be initialized."); - } - } catch (InterruptedException e) { - throw new ConnectException("Interrupted while waiting for herder to be initialized.", e); - } - return herder; - } - - /** - * @param herder set a herder, and signal to all threads waiting on get(). - */ - public void setHerder(Herder herder) { - this.herder = herder; - initialized.countDown(); - } - -} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index 32f7add..e3a4833 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -23,7 +23,7 @@ import org.apache.kafka.connect.health.ConnectorHealth; import org.apache.kafka.connect.health.ConnectorState; import org.apache.kafka.connect.health.ConnectorType; import org.apache.kafka.connect.health.TaskState; -import org.apache.kafka.connect.runtime.HerderProvider; +import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.util.FutureCallback; @@ -38,17 +38,17 @@ import java.util.concurrent.TimeoutException; public class ConnectClusterStateImpl implements ConnectClusterState { private final long herderRequestTimeoutMs; - private final HerderProvider herderProvider; + private final Herder herder; - public ConnectClusterStateImpl(long connectorsTimeoutMs, HerderProvider herderProvider) { + public ConnectClusterStateImpl(long connectorsTimeoutMs, Herder herder) { this.herderRequestTimeoutMs = connectorsTimeoutMs; - this.herderProvider = herderProvider; + this.herder = herder; } @Override public Collection<String> connectors() { FutureCallback<Collection<String>> connectorsCallback = new FutureCallback<>(); - herderProvider.get().connectors(connectorsCallback); + herder.connectors(connectorsCallback); try { return connectorsCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { @@ -59,7 +59,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState { @Override public ConnectorHealth connectorHealth(String connName) { - ConnectorStateInfo state = herderProvider.get().connectorStatus(connName); + ConnectorStateInfo state = herder.connectorStatus(connName); ConnectorState connectorState = new ConnectorState( state.connector().state(), state.connector().workerId(), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 1840b24..5d60771 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -21,10 +21,9 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; -import org.apache.kafka.connect.runtime.HerderProvider; +import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; -import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; @@ -35,8 +34,8 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.Slf4jRequestLog; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; import org.eclipse.jetty.servlet.FilterHolder; @@ -75,6 +74,7 @@ public class RestServer { private static final String PROTOCOL_HTTPS = "https"; private final WorkerConfig config; + private ContextHandlerCollection handlers; private Server jettyServer; private List<ConnectRestExtension> connectRestExtensions = Collections.emptyList(); @@ -88,6 +88,7 @@ public class RestServer { List<String> listeners = parseListeners(); jettyServer = new Server(); + handlers = new ContextHandlerCollection(); createConnectors(listeners); } @@ -160,20 +161,40 @@ public class RestServer { return connector; } - public void start(HerderProvider herderProvider, Plugins plugins) { - log.info("Starting REST server"); + public void initializeServer() { + log.info("Initializing REST server"); + + /* Needed for graceful shutdown as per `setStopTimeout` documentation */ + StatisticsHandler statsHandler = new StatisticsHandler(); + statsHandler.setHandler(handlers); + jettyServer.setHandler(statsHandler); + jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS); + jettyServer.setStopAtShutdown(true); + + try { + jettyServer.start(); + } catch (Exception e) { + throw new ConnectException("Unable to initialize REST server", e); + } + + log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl()); + } + + @SuppressWarnings("deprecation") + public void initializeResources(Herder herder) { + log.info("Initializing REST resources"); ResourceConfig resourceConfig = new ResourceConfig(); resourceConfig.register(new JacksonJsonProvider()); - resourceConfig.register(new RootResource(herderProvider)); - resourceConfig.register(new ConnectorsResource(herderProvider, config)); - resourceConfig.register(new ConnectorPluginsResource(herderProvider)); + resourceConfig.register(new RootResource(herder)); + resourceConfig.register(new ConnectorsResource(herder, config)); + resourceConfig.register(new ConnectorPluginsResource(herder)); resourceConfig.register(ConnectExceptionMapper.class); resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true); - registerRestExtensions(herderProvider, plugins, resourceConfig); + registerRestExtensions(herder, resourceConfig); ServletContainer servletContainer = new ServletContainer(resourceConfig); ServletHolder servletHolder = new ServletHolder(servletContainer); @@ -200,23 +221,14 @@ public class RestServer { requestLog.setLogLatency(true); requestLogHandler.setRequestLog(requestLog); - HandlerCollection handlers = new HandlerCollection(); handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler}); - - /* Needed for graceful shutdown as per `setStopTimeout` documentation */ - StatisticsHandler statsHandler = new StatisticsHandler(); - statsHandler.setHandler(handlers); - jettyServer.setHandler(statsHandler); - jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS); - jettyServer.setStopAtShutdown(true); - try { - jettyServer.start(); + context.start(); } catch (Exception e) { - throw new ConnectException("Unable to start REST server", e); + throw new ConnectException("Unable to initialize REST resources", e); } - log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl()); + log.info("REST resources initialized; server is started and ready to handle requests"); } public URI serverUrl() { @@ -237,9 +249,8 @@ public class RestServer { jettyServer.stop(); jettyServer.join(); } catch (Exception e) { - throw new ConnectException("Unable to stop REST server", e); - } finally { jettyServer.destroy(); + throw new ConnectException("Unable to stop REST server", e); } log.info("REST server stopped"); @@ -247,7 +258,8 @@ public class RestServer { /** * Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty - * server, unless overrides for advertised hostname and/or port are provided via configs. + * server, unless overrides for advertised hostname and/or port are provided via configs. {@link #initializeServer()} + * must be invoked successfully before calling this method. */ public URI advertisedUrl() { UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI()); @@ -303,8 +315,8 @@ public class RestServer { return null; } - void registerRestExtensions(HerderProvider provider, Plugins plugins, ResourceConfig resourceConfig) { - connectRestExtensions = plugins.newPlugins( + void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) { + connectRestExtensions = herder.plugins().newPlugins( config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG), config, ConnectRestExtension.class); @@ -319,7 +331,7 @@ public class RestServer { ConnectRestExtensionContext connectRestExtensionContext = new ConnectRestExtensionContextImpl( new ConnectRestConfigurable(resourceConfig), - new ConnectClusterStateImpl(herderRequestTimeoutMs, provider) + new ConnectClusterStateImpl(herderRequestTimeoutMs, herder) ); for (ConnectRestExtension connectRestExtension : connectRestExtensions) { connectRestExtension.register(connectRestExtensionContext); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 87f25b2..24eb93b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -18,7 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.HerderProvider; +import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; @@ -49,7 +49,7 @@ import java.util.Map; public class ConnectorPluginsResource { private static final String ALIAS_SUFFIX = "Connector"; - private final HerderProvider herderProvider; + private final Herder herder; private final List<ConnectorPluginInfo> connectorPlugins; private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList( @@ -58,8 +58,8 @@ public class ConnectorPluginsResource { SchemaSourceConnector.class ); - public ConnectorPluginsResource(HerderProvider herderProvider) { - this.herderProvider = herderProvider; + public ConnectorPluginsResource(Herder herder) { + this.herder = herder; this.connectorPlugins = new ArrayList<>(); } @@ -78,7 +78,7 @@ public class ConnectorPluginsResource { ); } - return herderProvider.get().validateConnectorConfig(connectorConfig); + return herder.validateConnectorConfig(connectorConfig); } @GET @@ -90,7 +90,7 @@ public class ConnectorPluginsResource { // TODO: improve once plugins are allowed to be added/removed during runtime. private synchronized List<ConnectorPluginInfo> getConnectorPlugins() { if (connectorPlugins.isEmpty()) { - for (PluginDesc<Connector> plugin : herderProvider.get().plugins().connectors()) { + for (PluginDesc<Connector> plugin : herder.plugins().connectors()) { if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { connectorPlugins.add(new ConnectorPluginInfo(plugin)); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index d4d84f1..4a04512 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.distributed.RequestTargetException; @@ -68,25 +67,21 @@ public class ConnectorsResource { // but currently a worker simply leaving the group can take this long as well. public static final long REQUEST_TIMEOUT_MS = 90 * 1000; - private final HerderProvider herderProvider; + private final Herder herder; private final WorkerConfig config; @javax.ws.rs.core.Context private ServletContext context; - public ConnectorsResource(HerderProvider herder, WorkerConfig config) { - this.herderProvider = herder; + public ConnectorsResource(Herder herder, WorkerConfig config) { + this.herder = herder; this.config = config; } - private Herder herder() { - return herderProvider.get(); - } - @GET @Path("/") public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Collection<String>> cb = new FutureCallback<>(); - herder().connectors(cb); + herder.connectors(cb); return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() { }, forward); } @@ -104,7 +99,7 @@ public class ConnectorsResource { checkAndPutConnectorConfigName(name, configs); FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); - herder().putConnectorConfig(name, configs, false, cb); + herder.putConnectorConfig(name, configs, false, cb); Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); @@ -117,7 +112,7 @@ public class ConnectorsResource { public ConnectorInfo getConnector(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<ConnectorInfo> cb = new FutureCallback<>(); - herder().connectorInfo(connector, cb); + herder.connectorInfo(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward); } @@ -126,14 +121,14 @@ public class ConnectorsResource { public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Map<String, String>> cb = new FutureCallback<>(); - herder().connectorConfig(connector, cb); + herder.connectorConfig(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward); } @GET @Path("/{connector}/status") public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable { - return herder().connectorStatus(connector); + return herder.connectorStatus(connector); } @PUT @@ -144,7 +139,7 @@ public class ConnectorsResource { FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); checkAndPutConnectorConfigName(connector, connectorConfig); - herder().putConnectorConfig(connector, connectorConfig, true, cb); + herder.putConnectorConfig(connector, connectorConfig, true, cb); Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); Response.ResponseBuilder response; @@ -162,21 +157,21 @@ public class ConnectorsResource { public void restartConnector(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Void> cb = new FutureCallback<>(); - herder().restartConnector(connector, cb); + herder.restartConnector(connector, cb); completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward); } @PUT @Path("/{connector}/pause") public Response pauseConnector(@PathParam("connector") String connector) { - herder().pauseConnector(connector); + herder.pauseConnector(connector); return Response.accepted().build(); } @PUT @Path("/{connector}/resume") public Response resumeConnector(@PathParam("connector") String connector) { - herder().resumeConnector(connector); + herder.resumeConnector(connector); return Response.accepted().build(); } @@ -185,7 +180,7 @@ public class ConnectorsResource { public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<List<TaskInfo>> cb = new FutureCallback<>(); - herder().taskConfigs(connector, cb); + herder.taskConfigs(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() { }, forward); } @@ -196,7 +191,7 @@ public class ConnectorsResource { final @QueryParam("forward") Boolean forward, final List<Map<String, String>> taskConfigs) throws Throwable { FutureCallback<Void> cb = new FutureCallback<>(); - herder().putTaskConfigs(connector, taskConfigs, cb); + herder.putTaskConfigs(connector, taskConfigs, cb); completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward); } @@ -204,7 +199,7 @@ public class ConnectorsResource { @Path("/{connector}/tasks/{task}/status") public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector, final @PathParam("task") Integer task) throws Throwable { - return herder().taskStatus(new ConnectorTaskId(connector, task)); + return herder.taskStatus(new ConnectorTaskId(connector, task)); } @POST @@ -214,7 +209,7 @@ public class ConnectorsResource { final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Void> cb = new FutureCallback<>(); ConnectorTaskId taskId = new ConnectorTaskId(connector, task); - herder().restartTask(taskId, cb); + herder.restartTask(taskId, cb); completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward); } @@ -223,7 +218,7 @@ public class ConnectorsResource { public void destroyConnector(final @PathParam("connector") String connector, final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); - herder().deleteConnectorConfig(connector, cb); + herder.deleteConnectorConfig(connector, cb); completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java index 56516cd..9666bf1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.resources; -import org.apache.kafka.connect.runtime.HerderProvider; +import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import javax.ws.rs.GET; @@ -28,15 +28,15 @@ import javax.ws.rs.core.MediaType; @Produces(MediaType.APPLICATION_JSON) public class RootResource { - private final HerderProvider herder; + private final Herder herder; - public RootResource(HerderProvider herder) { + public RootResource(Herder herder) { this.herder = herder; } @GET @Path("/") public ServerInfo serverInfo() { - return new ServerInfo(herder.get().kafkaClusterId()); + return new ServerInfo(herder.kafkaClusterId()); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java new file mode 100644 index 0000000..d4cac39 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.connect.rest.ConnectRestExtension; +import org.apache.kafka.connect.rest.ConnectRestExtensionContext; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; + +/** + * A simple integration test to ensure that REST extensions are registered correctly. + */ +@Category(IntegrationTest.class) +public class RestExtensionIntegrationTest { + + private static final int NUM_WORKERS = 3; + private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + + private EmbeddedConnectCluster connect; + + @Test + public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException { + // setup Connect worker properties + Map<String, String> workerProps = new HashMap<>(); + workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName()); + + // build a Connect cluster backed by Kafka and Zk + connect = new EmbeddedConnectCluster.Builder() + .name("connect-cluster") + .numWorkers(NUM_WORKERS) + .numBrokers(1) + .workerProps(workerProps) + .build(); + + // start the clusters + connect.start(); + + waitForCondition( + this::extensionIsRegistered, + REST_EXTENSION_REGISTRATION_TIMEOUT_MS, + "REST extension was never registered" + ); + } + + @After + public void close() { + // stop all Connect, Kafka and Zk threads. + connect.stop(); + } + + private boolean extensionIsRegistered() { + try { + String extensionUrl = connect.endpointForResource("integration-test-rest-extension/registered"); + return "true".equals(connect.executeGet(extensionUrl)); + } catch (ConnectRestException | IOException e) { + return false; + } + } + + public static class IntegrationTestRestExtension implements ConnectRestExtension { + + @Override + public void register(ConnectRestExtensionContext restPluginContext) { + restPluginContext.clusterState().connectors(); + restPluginContext.configurable().register(new IntegrationTestRestExtensionResource()); + } + + @Override + public void close() { + } + + @Override + public void configure(Map<String, ?> configs) { + } + + @Override + public String version() { + return "test"; + } + + @Path("integration-test-rest-extension") + public static class IntegrationTestRestExtensionResource { + + @GET + @Path("/registered") + public boolean isRegistered() { + return true; + } + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java index b232a4d..78780f3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.health; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.util.Callback; import org.easymock.Capture; import org.easymock.EasyMock; @@ -41,15 +40,13 @@ public class ConnectClusterStateImplTest { @Mock protected Herder herder; - protected HerderProvider herderProvider; protected ConnectClusterStateImpl connectClusterState; protected Collection<String> expectedConnectors; protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10); @Before public void setUp() { - herderProvider = new HerderProvider(herder); - connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herderProvider); + connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herder); expectedConnectors = Arrays.asList("sink1", "source1", "source2"); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 128532f..3d297b7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -26,7 +26,6 @@ import org.apache.http.impl.client.HttpClients; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -175,7 +174,8 @@ public class RestServerTest { PowerMock.replayAll(); server = new RestServer(workerConfig); - server.start(new HerderProvider(herder), herder.plugins()); + server.initializeServer(); + server.initializeResources(herder); HttpOptions request = new HttpOptions("/connectors"); request.addHeader("Content-Type", MediaType.WILDCARD); @@ -218,7 +218,8 @@ public class RestServerTest { PowerMock.replayAll(); server = new RestServer(workerConfig); - server.start(new HerderProvider(herder), herder.plugins()); + server.initializeServer(); + server.initializeResources(herder); HttpRequest request = new HttpGet("/connectors"); request.addHeader("Referer", origin + "/page"); request.addHeader("Origin", origin); @@ -275,7 +276,8 @@ public class RestServerTest { PowerMock.replayAll(); server = new RestServer(workerConfig); - server.start(new HerderProvider(herder), herder.plugins()); + server.initializeServer(); + server.initializeResources(herder); HttpRequest request = new HttpGet("/connectors"); CloseableHttpClient httpClient = HttpClients.createMinimal(); HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 684064d..a3aee6a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.TestSinkConnector; import org.apache.kafka.connect.runtime.TestSourceConnector; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -187,7 +186,7 @@ public class ConnectorPluginsResourceTest { plugins = PowerMock.createMock(Plugins.class); herder = PowerMock.createMock(AbstractHerder.class); - connectorPluginsResource = new ConnectorPluginsResource(new HerderProvider(herder)); + connectorPluginsResource = new ConnectorPluginsResource(herder); } private void expectPlugins() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 5a52074..f84cd25 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.NotAssignedException; import org.apache.kafka.connect.runtime.distributed.NotLeaderException; @@ -127,7 +126,7 @@ public class ConnectorsResourceTest { public void setUp() throws NoSuchMethodException { PowerMock.mockStatic(RestClient.class, RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class)); - connectorsResource = new ConnectorsResource(new HerderProvider(herder), null); + connectorsResource = new ConnectorsResource(herder, null); } private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java index be80e28..4e928a3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -40,7 +39,7 @@ public class RootResourceTest extends EasyMockSupport { @Before public void setUp() { - rootResource = new RootResource(new HerderProvider(herder)); + rootResource = new RootResource(herder); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 590649b..e610812 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -333,7 +333,7 @@ public class EmbeddedConnectCluster { } } - private String endpointForResource(String resource) throws IOException { + public String endpointForResource(String resource) throws IOException { String url = connectCluster.stream() .map(WorkerHandle::url) .filter(Objects::nonNull)