erdody commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r649771532



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> 
connectorProps, Callback
         });
     }
 
+    /**
+     * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+     * and the current status of the connector and task instances.
+     *
+     * @param request the restart request; may not be null
+     * @return the restart plan, or empty this worker has no status for the 
connector named in the request and therefore the

Review comment:
       Nit: or empty **if** this worker ....

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##########
@@ -255,12 +257,29 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 
     @POST
     @Path("/{connector}/restart")
-    public void restartConnector(final @PathParam("connector") String 
connector,
+    public Response restartConnector(final @PathParam("connector") String 
connector,
                                  final @Context HttpHeaders headers,
+                                 final @DefaultValue ("false") 
@QueryParam("includeTasks") Boolean includeTasks,
+                                 final @DefaultValue ("false") 
@QueryParam("onlyFailed") Boolean onlyFailed,
                                  final @QueryParam("forward") Boolean forward) 
throws Throwable {
-        FutureCallback<Void> cb = new FutureCallback<>();
-        herder.restartConnector(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", 
"POST", headers, null, forward);
+        RestartRequest restartRequest = new RestartRequest(connector, 
onlyFailed, includeTasks);
+        if (restartRequest.forciblyRestartConnectorOnly()) {
+            // For backward compatibility, just restart the connector instance 
and return OK with no body
+            FutureCallback<Void> cb = new FutureCallback<>();
+            herder.restartConnector(connector, cb);
+            completeOrForwardRequest(cb, "/connectors/" + connector + 
"/restart", "POST", headers, null, forward);
+            return Response.ok().build();
+        }
+
+        FutureCallback<ConnectorStateInfo> cb = new FutureCallback<>();
+        herder.restartConnectorAndTasks(restartRequest, cb);
+        Map<String, String> queryParameters = new HashMap<>();
+        queryParameters.put("includeTasks", String.valueOf(includeTasks));
+        queryParameters.put("onlyFailed", String.valueOf(onlyFailed));
+        String forwardingPath = "/connectors/" + connector + "/restart";

Review comment:
       Nit: Move up so you can share with 270?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -186,6 +192,10 @@
     private short currentProtocolVersion;
     private short backoffRetries;
 
+    // visible for testing
+    // The pending restart requests for the connectors;
+    final NavigableSet<RestartRequest> pendingRestartRequests = new 
TreeSet<>();

Review comment:
       There are a few comments in different places explaining the special 
equality implementation in RestartRequest.
   Have we considered making this a Map<String, RestartRequest> to make it 
explicit that we keep the latest per connector, have a more typical 
equals/hashcode and avoid all the warnings?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -186,6 +192,10 @@
     private short currentProtocolVersion;
     private short backoffRetries;
 
+    // visible for testing
+    // The pending restart requests for the connectors;
+    final NavigableSet<RestartRequest> pendingRestartRequests = new 
TreeSet<>();

Review comment:
       Just out of curiosity, any particular reason why we want to process 
these in connectorName order? (instead of FIFO)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,104 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlanFor(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);

Review comment:
       Nit: Use a message similar to the one you corrected in line 1030?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+/**
+ * An immutable restart plan.
+ */
+public class RestartPlan {
+
+    private final RestartRequest request;
+    private final ConnectorStateInfo stateInfo;
+    private final Collection<ConnectorTaskId> idsToRestart;
+
+    /**
+     * Create a new request to restart a connector and optionally its tasks.
+     *
+     * @param request          the restart request; may not be null
+     * @param restartStateInfo the current state info for the connector; may 
not be null
+     */
+    public RestartPlan(RestartRequest request, ConnectorStateInfo 
restartStateInfo) {
+        this.request = Objects.requireNonNull(request, "RestartRequest name 
may not be null");
+        this.stateInfo = Objects.requireNonNull(restartStateInfo, 
"ConnectorStateInfo name may not be null");
+        idsToRestart = Collections.unmodifiableList(
+                stateInfo.tasks()
+                        .stream()
+                        .filter(taskState -> isRestarting(taskState))

Review comment:
       Nit: can use a method reference

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> 
connectorProps, Callback
         });
     }
 
+    /**
+     * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+     * and the current status of the connector and task instances.
+     *
+     * @param request the restart request; may not be null
+     * @return the restart plan, or empty this worker has no status for the 
connector named in the request and therefore the
+     *         connector cannot be restarted
+     */
+    public Optional<RestartPlan> buildRestartPlanFor(RestartRequest request) {
+        String connectorName = request.connectorName();
+        ConnectorStatus connectorStatus = 
statusBackingStore.get(connectorName);
+        if (connectorStatus == null) {
+            return Optional.empty();
+        }
+
+        // If requested, mark the connector as restarting
+        AbstractStatus.State connectorState;
+        if (request.includeConnector(connectorStatus)) {
+            connectorState = AbstractStatus.State.RESTARTING;
+        } else {
+            connectorState = connectorStatus.state();
+        }
+        ConnectorStateInfo.ConnectorState connectorInfoState = new 
ConnectorStateInfo.ConnectorState(
+                connectorState.toString(),
+                connectorStatus.workerId(),
+                connectorStatus.trace()
+        );
+
+        // Collect the task IDs to stop and restart (may be none)

Review comment:
       Nit: This actually collects the state of all tasks, so it's only empty 
if there are no tasks, right?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,104 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlanFor(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+                    }
+
+                    return null;
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    /**
+     * Process all pending restart requests. There can be at most one request 
per connector, because of how
+     * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+     *
+     * <p>This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+     * are processed at once before any additional requests are added.
+     */
+    private synchronized void processRestartRequests() {
+        RestartRequest request;
+        while ((request = pendingRestartRequests.pollFirst()) != null) {
+            doRestartConnectorAndTasks(request);
+        }
+    }
+
+    protected synchronized boolean doRestartConnectorAndTasks(RestartRequest 
request) {
+        final String connectorName = request.connectorName();
+        Optional<RestartPlan> maybePlan = buildRestartPlanFor(request);
+        if (!maybePlan.isPresent()) {
+            log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+            return false;
+        }
+        RestartPlan plan = maybePlan.get();
+        log.info("Executing {}", plan);
+
+
+        // If requested, stop the connector and any tasks, marking each as 
restarting
+        final ExtendedAssignment currentAssignments = assignment;
+        final Collection<ConnectorTaskId> assignedIdsToRestart = 
plan.taskIdsToRestart()
+                                                                     .stream()
+                                                                     
.filter(taskId -> currentAssignments.tasks().contains(taskId))
+                                                                     
.collect(Collectors.toList());
+        final boolean restartConnector = plan.restartConnector() && 
currentAssignments.connectors().contains(connectorName);
+        final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+        if (restartConnector) {
+            worker.stopAndAwaitConnector(connectorName);
+            recordRestarting(connectorName);
+        }
+        if (restartTasks) {
+            // Stop the tasks and mark as restarting
+            worker.stopAndAwaitTasks(assignedIdsToRestart);
+            assignedIdsToRestart.forEach(this::recordRestarting);
+        }
+
+        // Now restart the connector and tasks
+        if (restartConnector) {
+            startConnector(connectorName, (error, targetState) -> {
+                if (error == null) {
+                    log.info("Connector {} successfully restarted", 
connectorName);
+                } else {
+                    log.error("Failed to restart connector '" + connectorName 
+ "'", error);
+                }
+            });
+        }
+        if (restartTasks) {
+            log.debug("Restarting {} of {} tasks for {}", 
plan.restartTaskCount(), plan.totalTaskCount(), request);
+            plan.taskIdsToRestart().forEach(this::startTask);
+            log.debug("Restarted {} of {} tasks for {} as requested", 
plan.restartTaskCount(), plan.totalTaskCount(), request);
+        }
+        log.info("Completed {}", plan);
+        return restartConnector || restartTasks;

Review comment:
       The return value is only used by tests. Can we assert based on other 
methods calls instead?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,104 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlanFor(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+                    }
+
+                    return null;
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    /**
+     * Process all pending restart requests. There can be at most one request 
per connector, because of how
+     * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+     *
+     * <p>This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+     * are processed at once before any additional requests are added.
+     */
+    private synchronized void processRestartRequests() {

Review comment:
       Just wondering: could blocking the addition of new entries be a problem, 
considering that this method can take some time? Would it be worth creating a 
copy of the collection to minimize the synchronized time?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
##########
@@ -58,6 +58,9 @@ public void start(Map<String, String> props) {
         commonConfigs = props;
         log.info("Started {} connector {}", this.getClass().getSimpleName(), 
connectorName);
         connectorHandle.recordConnectorStart();
+        if 
("true".equalsIgnoreCase(props.getOrDefault("connector.start.inject.error", 
"false"))) {

Review comment:
       Nit: `Boolean.parseBoolean()`

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,104 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlanFor(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+                    }
+
+                    return null;
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    /**
+     * Process all pending restart requests. There can be at most one request 
per connector, because of how
+     * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+     *
+     * <p>This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+     * are processed at once before any additional requests are added.
+     */
+    private synchronized void processRestartRequests() {
+        RestartRequest request;
+        while ((request = pendingRestartRequests.pollFirst()) != null) {
+            doRestartConnectorAndTasks(request);
+        }
+    }
+
+    protected synchronized boolean doRestartConnectorAndTasks(RestartRequest 
request) {
+        final String connectorName = request.connectorName();
+        Optional<RestartPlan> maybePlan = buildRestartPlanFor(request);
+        if (!maybePlan.isPresent()) {
+            log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+            return false;
+        }
+        RestartPlan plan = maybePlan.get();
+        log.info("Executing {}", plan);
+
+
+        // If requested, stop the connector and any tasks, marking each as 
restarting
+        final ExtendedAssignment currentAssignments = assignment;
+        final Collection<ConnectorTaskId> assignedIdsToRestart = 
plan.taskIdsToRestart()
+                                                                     .stream()
+                                                                     
.filter(taskId -> currentAssignments.tasks().contains(taskId))
+                                                                     
.collect(Collectors.toList());
+        final boolean restartConnector = plan.restartConnector() && 
currentAssignments.connectors().contains(connectorName);
+        final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+        if (restartConnector) {
+            worker.stopAndAwaitConnector(connectorName);
+            recordRestarting(connectorName);
+        }
+        if (restartTasks) {
+            // Stop the tasks and mark as restarting
+            worker.stopAndAwaitTasks(assignedIdsToRestart);
+            assignedIdsToRestart.forEach(this::recordRestarting);
+        }
+
+        // Now restart the connector and tasks
+        if (restartConnector) {
+            startConnector(connectorName, (error, targetState) -> {
+                if (error == null) {
+                    log.info("Connector {} successfully restarted", 
connectorName);

Review comment:
       Nit: quotes around connectorName, like in the line below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to