C0urante commented on code in PR #14293: URL: https://github.com/apache/kafka/pull/14293#discussion_r1414541177
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES; + +public class MirrorHerder extends DistributedHerder { + + private static final Logger log = LoggerFactory.getLogger(MirrorHerder.class); + + private final MirrorMakerConfig config; + private final SourceAndTarget sourceAndTarget; + private boolean wasLeader; + + public MirrorHerder(MirrorMakerConfig mirrorConfig, SourceAndTarget sourceAndTarget, DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, RestClient restClient, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, List<String> restNamespace, AutoCloseable... uponShutdown) { + super(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, restUrl, restClient, connectorClientConfigOverridePolicy, restNamespace, uponShutdown); + this.config = mirrorConfig; + this.sourceAndTarget = sourceAndTarget; + } + + @Override + protected boolean handleRebalanceCompleted() { + if (!super.handleRebalanceCompleted()) { + return false; + } + if (isLeader()) { + if (!wasLeader) { + log.info("This node {} is now a leader for {}. Configuring connectors...", this, sourceAndTarget); + configureConnectors(); + } + wasLeader = true; + } else { + wasLeader = false; + } + return true; + } + + private void configureConnectors() { + CONNECTOR_CLASSES.forEach(this::maybeConfigureConnector); + } + + private void maybeConfigureConnector(Class<?> connectorClass) { + Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass); + connectorConfig(connectorClass.getSimpleName(), (e, existingConfig) -> { Review Comment: Do we need/want this to execute asynchronously? At first glance it seems like it'd be cleaner to read the connector config from the [superclass's configState field](https://github.com/apache/kafka/blob/a83bc2d977d2af85d4edfc8096854137481001e9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L193), which is already `protected` (though for somewhat dubious reasons...). ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES; + +public class MirrorHerder extends DistributedHerder { + + private static final Logger log = LoggerFactory.getLogger(MirrorHerder.class); + + private final MirrorMakerConfig config; + private final SourceAndTarget sourceAndTarget; + private boolean wasLeader; + + public MirrorHerder(MirrorMakerConfig mirrorConfig, SourceAndTarget sourceAndTarget, DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, RestClient restClient, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, List<String> restNamespace, AutoCloseable... uponShutdown) { + super(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, restUrl, restClient, connectorClientConfigOverridePolicy, restNamespace, uponShutdown); + this.config = mirrorConfig; + this.sourceAndTarget = sourceAndTarget; + } + + @Override + protected boolean handleRebalanceCompleted() { + if (!super.handleRebalanceCompleted()) { Review Comment: Small thought: if there's no reason not to invoke the super method (here or in other theoretical subclasses), we may want to introduce a separate hook method in the `DistributedHerder` class that it invokes after successful calls to `handleRebalanceCompleted`, instead of allowing classes to potentially opt out of the logic in `Distributed::handleRebalanceCompleted` from being executed. Not a blocker, though. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES; + +public class MirrorHerder extends DistributedHerder { + + private static final Logger log = LoggerFactory.getLogger(MirrorHerder.class); + + private final MirrorMakerConfig config; + private final SourceAndTarget sourceAndTarget; + private boolean wasLeader; + + public MirrorHerder(MirrorMakerConfig mirrorConfig, SourceAndTarget sourceAndTarget, DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, RestClient restClient, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, List<String> restNamespace, AutoCloseable... uponShutdown) { + super(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, restUrl, restClient, connectorClientConfigOverridePolicy, restNamespace, uponShutdown); + this.config = mirrorConfig; + this.sourceAndTarget = sourceAndTarget; + } + + @Override + protected boolean handleRebalanceCompleted() { + if (!super.handleRebalanceCompleted()) { + return false; + } + if (isLeader()) { + if (!wasLeader) { + log.info("This node {} is now a leader for {}. Configuring connectors...", this, sourceAndTarget); + configureConnectors(); + } + wasLeader = true; + } else { + wasLeader = false; + } + return true; + } + + private void configureConnectors() { + CONNECTOR_CLASSES.forEach(this::maybeConfigureConnector); + } + + private void maybeConfigureConnector(Class<?> connectorClass) { + Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass); + connectorConfig(connectorClass.getSimpleName(), (e, existingConfig) -> { + if (existingConfig == null || !existingConfig.equals(connectorProps)) { + configureConnector(connectorClass.getSimpleName(), connectorProps); + } else { + log.info("This node is a leader for {} and configuration for {} is already up to date.", sourceAndTarget, connectorClass.getSimpleName()); + } + }); + } + private void configureConnector(String connectorName, Map<String, String> connectorProps) { Review Comment: Nit: ```suggestion private void configureConnector(String connectorName, Map<String, String> connectorProps) { ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ########## @@ -332,11 +303,6 @@ public void run() { } } - public ConnectorStateInfo connectorStatus(SourceAndTarget sourceAndTarget, String connector) { - checkHerder(sourceAndTarget); - return herders.get(sourceAndTarget).connectorStatus(connector); - } Review Comment: It's a pretty big change in the internal API to make the herders map public. I think we should stick to methods like this one that restrict access to herders. We could retain this method and add a new `taskConfigs` method, which I think should cover all current needs. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ########## @@ -314,6 +356,23 @@ private <T extends SourceConnector> void awaitConnectorTasksStart(final MirrorMa }, MM_START_UP_TIMEOUT_MS, "Tasks for connector " + clazz.getSimpleName() + " for MirrorMaker instances did not transition to running in time"); } + private <T extends SourceConnector> void awaitConnectorConfiguration(MirrorMaker mm, Class<T> clazz, SourceAndTarget sourceAndTarget, Predicate<Map<String, String>> predicate) throws InterruptedException { + String connName = clazz.getSimpleName(); + waitForCondition(() -> { + try { + FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>(); + herder(mm, sourceAndTarget).tasksConfig(connName, cb); Review Comment: Interesting call checking on task configs instead of the connector config. Just out of curiosity, why one instead of the other? (Not a blocker) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
