gharris1727 commented on code in PR #15389:
URL: https://github.com/apache/kafka/pull/15389#discussion_r1517018424


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -773,40 +730,34 @@ public void testPutConnectorConfig() throws Exception {
         Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
 
         // Create
-        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
         Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, connConfig);
+        expectConfigValidation(connectorMock, connConfig);
 
         // Should get first config
         doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
         // Update config, which requires stopping and restarting
         doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
         final ArgumentCaptor<Map<String, String>> capturedConfig = 
ArgumentCaptor.forClass(Map.class);
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), 
capturedConfig.capture(), any(),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(null, capturedConfig, TargetState.STARTED, 
TargetState.STARTED, null);

Review Comment:
   Instead of capturing the config, pass in `newConnConfig` to use the equality 
assertion. Then the capturedConfig argument is always null and we can eliminate 
some of the code paths in mockStartConnector.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1044,14 +988,14 @@ public void 
testRequestTaskReconfigurationDoesNotDeadlock() throws Exception {
 
         // Set new config on the connector and tasks
         FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = 
new FutureCallback<>();
-        expectConfigValidation(connectorMock, false, newConfig);
+        expectConfigValidation(connectorMock, newConfig);

Review Comment:
   See testPutConnectorConfig comment about `false`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -773,40 +730,34 @@ public void testPutConnectorConfig() throws Exception {
         Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
 
         // Create
-        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
         Connector connectorMock = mock(SourceConnector.class);
-        expectConfigValidation(connectorMock, true, connConfig);
+        expectConfigValidation(connectorMock, connConfig);
 
         // Should get first config
         doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
         // Update config, which requires stopping and restarting
         doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
         final ArgumentCaptor<Map<String, String>> capturedConfig = 
ArgumentCaptor.forClass(Map.class);
-        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
-        doAnswer(invocation -> {
-            onStart.getValue().onCompletion(null, TargetState.STARTED);
-            return true;
-        }).when(worker).startConnector(eq(CONNECTOR_NAME), 
capturedConfig.capture(), any(),
-            eq(herder), eq(TargetState.STARTED), onStart.capture());
+        mockStartConnector(null, capturedConfig, TargetState.STARTED, 
TargetState.STARTED, null);
         // Generate same task config, which should result in no additional 
action to restart tasks
         when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
-            .thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
+                .thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
 
-        expectConfigValidation(connectorMock, false, newConnConfig);
+        expectConfigValidation(connectorMock, newConnConfig);

Review Comment:
   Here and in testPutConnectorConfig where the second argument was `false`, 
the config argument should be moved up to the first call in the method.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -114,64 +114,62 @@ public class StandaloneHerderTest {
     private static final String TOPICS_LIST_STR = "topic1,topic2";
     private static final String WORKER_ID = "localhost:8083";
     private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+    private static final Long WAIT_TIME = 1000L;
 
     private enum SourceSink {
         SOURCE, SINK
     }
 
     private StandaloneHerder herder;
 
-    private Connector connector;
     @Mock
     protected Worker worker;
-    @Mock protected WorkerConfigTransformer transformer;
-    @Mock private Plugins plugins;
+    @Mock
+    protected WorkerConfigTransformer transformer;
+    @Mock
+    private Plugins plugins;
     @Mock
     private PluginClassLoader pluginLoader;
     @Mock
     private LoaderSwap loaderSwap;
     protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;
-    @Mock protected StatusBackingStore statusBackingStore;
+    @Mock
+    protected StatusBackingStore statusBackingStore;
     private final SampleConnectorClientConfigOverridePolicy
-        noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
+            noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
     public void setup() throws ExecutionException, InterruptedException {
-        worker = mock(Worker.class);
         herder = mock(StandaloneHerder.class, withSettings()
-            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
-            .defaultAnswer(CALLS_REAL_METHODS));
+                .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+                .defaultAnswer(CALLS_REAL_METHODS));

Review Comment:
   My auto-formatter agrees with these whitespace changes, so I'm fine with 
changing this. :+1:



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -726,21 +683,21 @@ public void testAccessors() throws Exception {
         
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
 
         // Create connector
-        connector = mock(BogusSourceConnector.class);
+        Connector connector = mock(BogusSourceConnector.class);

Review Comment:
   This is also correct, and is more consistent with the other test cases.
   ```suggestion
           Connector connector = mock(SourceConnector.class);
   ```
   After you do this, you can change expectConfigValidation to take a 
SourceSink instead of a Connector, and call the mock(SinkConnector.class) or 
mock(SourceConnector.class) internally. The calling test doesn't ever need to 
see the connector variable, because that object is only ever needed for config 
validation in the main code.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -114,64 +114,62 @@ public class StandaloneHerderTest {
     private static final String TOPICS_LIST_STR = "topic1,topic2";
     private static final String WORKER_ID = "localhost:8083";
     private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+    private static final Long WAIT_TIME = 1000L;
 
     private enum SourceSink {
         SOURCE, SINK
     }
 
     private StandaloneHerder herder;
 
-    private Connector connector;
     @Mock
     protected Worker worker;
-    @Mock protected WorkerConfigTransformer transformer;
-    @Mock private Plugins plugins;
+    @Mock
+    protected WorkerConfigTransformer transformer;
+    @Mock
+    private Plugins plugins;
     @Mock
     private PluginClassLoader pluginLoader;
     @Mock
     private LoaderSwap loaderSwap;
     protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;
-    @Mock protected StatusBackingStore statusBackingStore;
+    @Mock
+    protected StatusBackingStore statusBackingStore;
     private final SampleConnectorClientConfigOverridePolicy
-        noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
+            noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
     public void setup() throws ExecutionException, InterruptedException {
-        worker = mock(Worker.class);
         herder = mock(StandaloneHerder.class, withSettings()
-            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
-            .defaultAnswer(CALLS_REAL_METHODS));
+                .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+                .defaultAnswer(CALLS_REAL_METHODS));
         createCallback = new FutureCallback<>();
-        plugins = mock(Plugins.class);
-        pluginLoader = mock(PluginClassLoader.class);
-        loaderSwap = mock(LoaderSwap.class);
         final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
         when(transformer.transform(eq(CONNECTOR_NAME), 
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
     }
 
     @After
     public void tearDown() {
         verifyNoMoreInteractions(worker, statusBackingStore);
+        herder.stop();

Review Comment:
   Interesting, were we leaking the herder background threads? That's a nice 
thing to fix, thanks!



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -226,59 +223,51 @@ public void testCreateConnectorAlreadyExists() throws 
Throwable {
 
     @Test
     public void testCreateSinkConnector() throws Exception {
-        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> config = connectorConfig(SourceSink.SINK);
         Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, true, config);
+        expectConfigValidation(connectorMock, config);
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(WAIT_TIME, TimeUnit.MILLISECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
     }
 
     @Test
     public void testCreateConnectorWithStoppedInitialState() throws Exception {
-        connector = mock(BogusSinkConnector.class);
         Map<String, String> config = connectorConfig(SourceSink.SINK);
         Connector connectorMock = mock(SinkConnector.class);
-        expectConfigValidation(connectorMock, false, config);
+        expectConfigValidation(connectorMock, config);
         when(plugins.newConnector(anyString())).thenReturn(connectorMock);

Review Comment:
   This is unnecessary now that the `false` typo in the earlier line is fixed, 
and expectConfigValidation is mocking newConnector.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to