gharris1727 commented on code in PR #15389:
URL: https://github.com/apache/kafka/pull/15389#discussion_r1517018424
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -773,40 +730,34 @@ public void testPutConnectorConfig() throws Exception {
Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
// Create
- connector = mock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Connector connectorMock = mock(SourceConnector.class);
- expectConfigValidation(connectorMock, true, connConfig);
+ expectConfigValidation(connectorMock, connConfig);
// Should get first config
doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
// Update config, which requires stopping and restarting
doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
final ArgumentCaptor<Map<String, String>> capturedConfig =
ArgumentCaptor.forClass(Map.class);
- final ArgumentCaptor<Callback<TargetState>> onStart =
ArgumentCaptor.forClass(Callback.class);
- doAnswer(invocation -> {
- onStart.getValue().onCompletion(null, TargetState.STARTED);
- return true;
- }).when(worker).startConnector(eq(CONNECTOR_NAME),
capturedConfig.capture(), any(),
- eq(herder), eq(TargetState.STARTED), onStart.capture());
+ mockStartConnector(null, capturedConfig, TargetState.STARTED,
TargetState.STARTED, null);
Review Comment:
Instead of capturing the config, pass in `newConnConfig` to use the equality
assertion. Then the capturedConfig argument is always null and we can eliminate
some of the code paths in mockStartConnector.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1044,14 +988,14 @@ public void
testRequestTaskReconfigurationDoesNotDeadlock() throws Exception {
// Set new config on the connector and tasks
FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback =
new FutureCallback<>();
- expectConfigValidation(connectorMock, false, newConfig);
+ expectConfigValidation(connectorMock, newConfig);
Review Comment:
See testPutConnectorConfig comment about `false`.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -773,40 +730,34 @@ public void testPutConnectorConfig() throws Exception {
Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
// Create
- connector = mock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Connector connectorMock = mock(SourceConnector.class);
- expectConfigValidation(connectorMock, true, connConfig);
+ expectConfigValidation(connectorMock, connConfig);
// Should get first config
doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
// Update config, which requires stopping and restarting
doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
final ArgumentCaptor<Map<String, String>> capturedConfig =
ArgumentCaptor.forClass(Map.class);
- final ArgumentCaptor<Callback<TargetState>> onStart =
ArgumentCaptor.forClass(Callback.class);
- doAnswer(invocation -> {
- onStart.getValue().onCompletion(null, TargetState.STARTED);
- return true;
- }).when(worker).startConnector(eq(CONNECTOR_NAME),
capturedConfig.capture(), any(),
- eq(herder), eq(TargetState.STARTED), onStart.capture());
+ mockStartConnector(null, capturedConfig, TargetState.STARTED,
TargetState.STARTED, null);
// Generate same task config, which should result in no additional
action to restart tasks
when(worker.connectorTaskConfigs(CONNECTOR_NAME, new
SourceConnectorConfig(plugins, newConnConfig, true)))
- .thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
+ .thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
- expectConfigValidation(connectorMock, false, newConnConfig);
+ expectConfigValidation(connectorMock, newConnConfig);
Review Comment:
Here and in testPutConnectorConfig where the second argument was `false`,
the config argument should be moved up to the first call in the method.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -114,64 +114,62 @@ public class StandaloneHerderTest {
private static final String TOPICS_LIST_STR = "topic1,topic2";
private static final String WORKER_ID = "localhost:8083";
private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+ private static final Long WAIT_TIME = 1000L;
private enum SourceSink {
SOURCE, SINK
}
private StandaloneHerder herder;
- private Connector connector;
@Mock
protected Worker worker;
- @Mock protected WorkerConfigTransformer transformer;
- @Mock private Plugins plugins;
+ @Mock
+ protected WorkerConfigTransformer transformer;
+ @Mock
+ private Plugins plugins;
@Mock
private PluginClassLoader pluginLoader;
@Mock
private LoaderSwap loaderSwap;
protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;
- @Mock protected StatusBackingStore statusBackingStore;
+ @Mock
+ protected StatusBackingStore statusBackingStore;
private final SampleConnectorClientConfigOverridePolicy
- noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
+ noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
public void setup() throws ExecutionException, InterruptedException {
- worker = mock(Worker.class);
herder = mock(StandaloneHerder.class, withSettings()
- .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
- .defaultAnswer(CALLS_REAL_METHODS));
+ .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
+ .defaultAnswer(CALLS_REAL_METHODS));
Review Comment:
My auto-formatter agrees with these whitespace changes, so I'm fine with
changing this. :+1:
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -726,21 +683,21 @@ public void testAccessors() throws Exception {
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class),
isNull());
// Create connector
- connector = mock(BogusSourceConnector.class);
+ Connector connector = mock(BogusSourceConnector.class);
Review Comment:
This is also correct, and is more consistent with the other test cases.
```suggestion
Connector connector = mock(SourceConnector.class);
```
After you do this, you can change expectConfigValidation to take a
SourceSink instead of a Connector, and call the mock(SinkConnector.class) or
mock(SourceConnector.class) internally. The calling test doesn't ever need to
see the connector variable, because that object is only ever needed for config
validation in the main code.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -114,64 +114,62 @@ public class StandaloneHerderTest {
private static final String TOPICS_LIST_STR = "topic1,topic2";
private static final String WORKER_ID = "localhost:8083";
private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+ private static final Long WAIT_TIME = 1000L;
private enum SourceSink {
SOURCE, SINK
}
private StandaloneHerder herder;
- private Connector connector;
@Mock
protected Worker worker;
- @Mock protected WorkerConfigTransformer transformer;
- @Mock private Plugins plugins;
+ @Mock
+ protected WorkerConfigTransformer transformer;
+ @Mock
+ private Plugins plugins;
@Mock
private PluginClassLoader pluginLoader;
@Mock
private LoaderSwap loaderSwap;
protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;
- @Mock protected StatusBackingStore statusBackingStore;
+ @Mock
+ protected StatusBackingStore statusBackingStore;
private final SampleConnectorClientConfigOverridePolicy
- noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
+ noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
public void setup() throws ExecutionException, InterruptedException {
- worker = mock(Worker.class);
herder = mock(StandaloneHerder.class, withSettings()
- .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
- .defaultAnswer(CALLS_REAL_METHODS));
+ .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
+ .defaultAnswer(CALLS_REAL_METHODS));
createCallback = new FutureCallback<>();
- plugins = mock(Plugins.class);
- pluginLoader = mock(PluginClassLoader.class);
- loaderSwap = mock(LoaderSwap.class);
final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
when(transformer.transform(eq(CONNECTOR_NAME),
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
}
@After
public void tearDown() {
verifyNoMoreInteractions(worker, statusBackingStore);
+ herder.stop();
Review Comment:
Interesting, were we leaking the herder background threads? That's a nice
thing to fix, thanks!
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -226,59 +223,51 @@ public void testCreateConnectorAlreadyExists() throws
Throwable {
@Test
public void testCreateSinkConnector() throws Exception {
- connector = mock(BogusSinkConnector.class);
expectAdd(SourceSink.SINK);
Map<String, String> config = connectorConfig(SourceSink.SINK);
Connector connectorMock = mock(SinkConnector.class);
- expectConfigValidation(connectorMock, true, config);
+ expectConfigValidation(connectorMock, config);
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
- Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
+ Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(WAIT_TIME, TimeUnit.MILLISECONDS);
assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
}
@Test
public void testCreateConnectorWithStoppedInitialState() throws Exception {
- connector = mock(BogusSinkConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SINK);
Connector connectorMock = mock(SinkConnector.class);
- expectConfigValidation(connectorMock, false, config);
+ expectConfigValidation(connectorMock, config);
when(plugins.newConnector(anyString())).thenReturn(connectorMock);
Review Comment:
This is unnecessary now that the `false` typo in the earlier line is fixed,
and expectConfigValidation is mocking newConnector.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]