gharris1727 commented on code in PR #15389: URL: https://github.com/apache/kafka/pull/15389#discussion_r1517018424
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -773,40 +730,34 @@ public void testPutConnectorConfig() throws Exception { Callback<Map<String, String>> connectorConfigCb = mock(Callback.class); // Create - connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, connConfig); + expectConfigValidation(connectorMock, connConfig); // Should get first config doNothing().when(connectorConfigCb).onCompletion(null, connConfig); // Update config, which requires stopping and restarting doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); final ArgumentCaptor<Map<String, String>> capturedConfig = ArgumentCaptor.forClass(Map.class); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(null, capturedConfig, TargetState.STARTED, TargetState.STARTED, null); Review Comment: Instead of capturing the config, pass in `newConnConfig` to use the equality assertion. Then the capturedConfig argument is always null and we can eliminate some of the code paths in mockStartConnector. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -1044,14 +988,14 @@ public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { // Set new config on the connector and tasks FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = new FutureCallback<>(); - expectConfigValidation(connectorMock, false, newConfig); + expectConfigValidation(connectorMock, newConfig); Review Comment: See testPutConnectorConfig comment about `false`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -773,40 +730,34 @@ public void testPutConnectorConfig() throws Exception { Callback<Map<String, String>> connectorConfigCb = mock(Callback.class); // Create - connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Connector connectorMock = mock(SourceConnector.class); - expectConfigValidation(connectorMock, true, connConfig); + expectConfigValidation(connectorMock, connConfig); // Should get first config doNothing().when(connectorConfigCb).onCompletion(null, connConfig); // Update config, which requires stopping and restarting doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); final ArgumentCaptor<Map<String, String>> capturedConfig = ArgumentCaptor.forClass(Map.class); - final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); - doAnswer(invocation -> { - onStart.getValue().onCompletion(null, TargetState.STARTED); - return true; - }).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(), - eq(herder), eq(TargetState.STARTED), onStart.capture()); + mockStartConnector(null, capturedConfig, TargetState.STARTED, TargetState.STARTED, null); // Generate same task config, which should result in no additional action to restart tasks when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) - .thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); + .thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); - expectConfigValidation(connectorMock, false, newConnConfig); + expectConfigValidation(connectorMock, newConnConfig); Review Comment: Here and in testPutConnectorConfig where the second argument was `false`, the config argument should be moved up to the first call in the method. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -114,64 +114,62 @@ public class StandaloneHerderTest { private static final String TOPICS_LIST_STR = "topic1,topic2"; private static final String WORKER_ID = "localhost:8083"; private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; + private static final Long WAIT_TIME = 1000L; private enum SourceSink { SOURCE, SINK } private StandaloneHerder herder; - private Connector connector; @Mock protected Worker worker; - @Mock protected WorkerConfigTransformer transformer; - @Mock private Plugins plugins; + @Mock + protected WorkerConfigTransformer transformer; + @Mock + private Plugins plugins; @Mock private PluginClassLoader pluginLoader; @Mock private LoaderSwap loaderSwap; protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback; - @Mock protected StatusBackingStore statusBackingStore; + @Mock + protected StatusBackingStore statusBackingStore; private final SampleConnectorClientConfigOverridePolicy - noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); + noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); @Before public void setup() throws ExecutionException, InterruptedException { - worker = mock(Worker.class); herder = mock(StandaloneHerder.class, withSettings() - .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()) - .defaultAnswer(CALLS_REAL_METHODS)); + .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()) + .defaultAnswer(CALLS_REAL_METHODS)); Review Comment: My auto-formatter agrees with these whitespace changes, so I'm fine with changing this. :+1: ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -726,21 +683,21 @@ public void testAccessors() throws Exception { doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull()); // Create connector - connector = mock(BogusSourceConnector.class); + Connector connector = mock(BogusSourceConnector.class); Review Comment: This is also correct, and is more consistent with the other test cases. ```suggestion Connector connector = mock(SourceConnector.class); ``` After you do this, you can change expectConfigValidation to take a SourceSink instead of a Connector, and call the mock(SinkConnector.class) or mock(SourceConnector.class) internally. The calling test doesn't ever need to see the connector variable, because that object is only ever needed for config validation in the main code. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -114,64 +114,62 @@ public class StandaloneHerderTest { private static final String TOPICS_LIST_STR = "topic1,topic2"; private static final String WORKER_ID = "localhost:8083"; private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; + private static final Long WAIT_TIME = 1000L; private enum SourceSink { SOURCE, SINK } private StandaloneHerder herder; - private Connector connector; @Mock protected Worker worker; - @Mock protected WorkerConfigTransformer transformer; - @Mock private Plugins plugins; + @Mock + protected WorkerConfigTransformer transformer; + @Mock + private Plugins plugins; @Mock private PluginClassLoader pluginLoader; @Mock private LoaderSwap loaderSwap; protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback; - @Mock protected StatusBackingStore statusBackingStore; + @Mock + protected StatusBackingStore statusBackingStore; private final SampleConnectorClientConfigOverridePolicy - noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); + noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); @Before public void setup() throws ExecutionException, InterruptedException { - worker = mock(Worker.class); herder = mock(StandaloneHerder.class, withSettings() - .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()) - .defaultAnswer(CALLS_REAL_METHODS)); + .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, new MockTime()) + .defaultAnswer(CALLS_REAL_METHODS)); createCallback = new FutureCallback<>(); - plugins = mock(Plugins.class); - pluginLoader = mock(PluginClassLoader.class); - loaderSwap = mock(LoaderSwap.class); final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); when(transformer.transform(eq(CONNECTOR_NAME), configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); } @After public void tearDown() { verifyNoMoreInteractions(worker, statusBackingStore); + herder.stop(); Review Comment: Interesting, were we leaking the herder background threads? That's a nice thing to fix, thanks! ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -226,59 +223,51 @@ public void testCreateConnectorAlreadyExists() throws Throwable { @Test public void testCreateSinkConnector() throws Exception { - connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> config = connectorConfig(SourceSink.SINK); Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, true, config); + expectConfigValidation(connectorMock, config); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); - Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); } @Test public void testCreateConnectorWithStoppedInitialState() throws Exception { - connector = mock(BogusSinkConnector.class); Map<String, String> config = connectorConfig(SourceSink.SINK); Connector connectorMock = mock(SinkConnector.class); - expectConfigValidation(connectorMock, false, config); + expectConfigValidation(connectorMock, config); when(plugins.newConnector(anyString())).thenReturn(connectorMock); Review Comment: This is unnecessary now that the `false` typo in the earlier line is fixed, and expectConfigValidation is mocking newConnector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org