This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new e7298f4 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer e7298f4 is described below commit e7298f4fc53f27f91564f60c3818fa392287ff33 Author: Robert Yokota <rayok...@gmail.com> AuthorDate: Tue Nov 27 22:01:21 2018 -0800 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer The restart logic for TTLs in `WorkerConfigTransformer` was broken when trying to make it toggle-able. Accessing the toggle through the `Herder` causes the same code to be called recursively. This fix just accesses the toggle by simply looking in the properties map that is passed to `WorkerConfigTransformer`. Author: Robert Yokota <rayok...@gmail.com> Reviewers: Magesh Nandakumar <magesh.n.ku...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io> Closes #5914 from rayokota/KAFKA-7620 (cherry picked from commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096) Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org> --- .../kafka/connect/runtime/ConnectorConfig.java | 5 ++- .../org/apache/kafka/connect/runtime/Herder.java | 6 --- .../connect/runtime/WorkerConfigTransformer.java | 44 ++++++++++++++-------- .../runtime/distributed/DistributedHerder.java | 8 ---- .../runtime/standalone/StandaloneHerder.java | 8 ---- .../runtime/WorkerConfigTransformerTest.java | 13 ++++--- 6 files changed, 39 insertions(+), 45 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 9d1a50d..d030fed 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; @@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig { "indicates that a configuration value will expire in the future."; private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action"; - public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString(); - public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString(); + public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT); + public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT); public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout"; public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors"; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 5c7cc14..c572e20 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -149,12 +149,6 @@ public interface Herder { void restartTask(ConnectorTaskId id, Callback<Void> cb); /** - * Get the configuration reload action. - * @param connName name of the connector - */ - ConfigReloadAction connectorConfigReloadAction(final String connName); - - /** * Restart the connector. * @param connName name of the connector * @param cb callback to invoke upon completion diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 1b715c7..3373d5c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.ConfigTransformerResult; +import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -29,6 +34,8 @@ import java.util.concurrent.ConcurrentMap; * retrieved TTL values. */ public class WorkerConfigTransformer { + private static final Logger log = LoggerFactory.getLogger(WorkerConfigTransformer.class); + private final Worker worker; private final ConfigTransformer configTransformer; private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>(); @@ -46,7 +53,16 @@ public class WorkerConfigTransformer { if (configs == null) return null; ConfigTransformerResult result = configTransformer.transform(configs); if (connectorName != null) { - scheduleReload(connectorName, result.ttls()); + String key = ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG; + String action = (String) ConfigDef.parseType(key, configs.get(key), ConfigDef.Type.STRING); + if (action == null) { + // The default action is "restart". + action = ConnectorConfig.CONFIG_RELOAD_ACTION_RESTART; + } + ConfigReloadAction reloadAction = ConfigReloadAction.valueOf(action.toUpperCase(Locale.ROOT)); + if (reloadAction == ConfigReloadAction.RESTART) { + scheduleReload(connectorName, result.ttls()); + } } return result.data(); } @@ -58,21 +74,19 @@ public class WorkerConfigTransformer { } private void scheduleReload(String connectorName, String path, long ttl) { - Herder herder = worker.herder(); - if (herder.connectorConfigReloadAction(connectorName) == Herder.ConfigReloadAction.RESTART) { - Map<String, HerderRequest> connectorRequests = requests.get(connectorName); - if (connectorRequests == null) { - connectorRequests = new ConcurrentHashMap<>(); - requests.put(connectorName, connectorRequests); - } else { - HerderRequest previousRequest = connectorRequests.get(path); - if (previousRequest != null) { - // Delete previous request for ttl which is now stale - previousRequest.cancel(); - } + Map<String, HerderRequest> connectorRequests = requests.get(connectorName); + if (connectorRequests == null) { + connectorRequests = new ConcurrentHashMap<>(); + requests.put(connectorName, connectorRequests); + } else { + HerderRequest previousRequest = connectorRequests.get(path); + if (previousRequest != null) { + // Delete previous request for ttl which is now stale + previousRequest.cancel(); } - HerderRequest request = herder.restartConnector(ttl, connectorName, null); - connectorRequests.put(path, request); } + log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl); + HerderRequest request = worker.herder().restartConnector(ttl, connectorName, null); + connectorRequests.put(path, request); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index f2009db..dc91f35 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -61,7 +61,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.NavigableSet; import java.util.NoSuchElementException; @@ -643,13 +642,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public ConfigReloadAction connectorConfigReloadAction(final String connName) { - return ConfigReloadAction.valueOf( - configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG) - .toUpperCase(Locale.ROOT)); - } - - @Override public void restartConnector(final String connName, final Callback<Void> callback) { restartConnector(0, connName, callback); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 40ad980..fe31c28 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executors; @@ -261,13 +260,6 @@ public class StandaloneHerder extends AbstractHerder { } @Override - public ConfigReloadAction connectorConfigReloadAction(final String connName) { - return ConfigReloadAction.valueOf( - configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG) - .toUpperCase(Locale.ROOT)); - } - - @Override public synchronized void restartConnector(String connName, Callback<Void> cb) { if (!configState.contains(connName)) cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java index 300022d..034bd51 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java @@ -28,9 +28,12 @@ import org.powermock.api.easymock.annotation.Mock; import org.powermock.modules.junit4.PowerMockRunner; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_NONE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.powermock.api.easymock.PowerMock.replayAll; @@ -69,18 +72,18 @@ public class WorkerConfigTransformerTest { @Test public void testReplaceVariableWithTTL() throws Exception { EasyMock.expect(worker.herder()).andReturn(herder); - EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE); replayAll(); - Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); - assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); + Map<String, String> props = new HashMap<>(); + props.put(MY_KEY, "${test:testPath:testKeyWithTTL}"); + props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE); + Map<String, String> result = configTransformer.transform(MY_CONNECTOR, props); } @Test public void testReplaceVariableWithTTLAndScheduleRestart() throws Exception { EasyMock.expect(worker.herder()).andReturn(herder); - EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART); EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId); replayAll(); @@ -92,11 +95,9 @@ public class WorkerConfigTransformerTest { @Test public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() throws Exception { EasyMock.expect(worker.herder()).andReturn(herder); - EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART); EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId); EasyMock.expect(worker.herder()).andReturn(herder); - EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART); EasyMock.expectLastCall(); requestId.cancel(); EasyMock.expectLastCall();