[
https://issues.apache.org/jira/browse/KAFKA-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855997#comment-17855997
]
Ksolves commented on KAFKA-16943:
---------------------------------
In our current test cases
(`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and
`testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that
the Connect worker fails to start. However, our mechanism for verifying the
startup failure lacks synchronous waiting and precise assertion.
Example Test Case: [Changes in existing test case marked in
{color:#00875a}*green*{color}]
{code:java}
@Test
public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws
InterruptedException {
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
"3");
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG,
"2");
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG,
"1");
int numWorkers = 0;
int numBrokers = 1;
connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
.workerProps(workerProps)
.numWorkers(numWorkers)
.numBrokers(numBrokers)
.brokerProps(brokerProps)
.build();
// Start the brokers and Connect, but Connect should fail to create config
and offset topic
connect.start();
log.info("Completed startup of {} Kafka broker. Expected Connect worker to
fail", numBrokers);
// Try to start a worker
connect.addWorker();
// Synchronously await and verify that the worker fails during startup
boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30
seconds timeout
assertFalse(workerStarted, "Worker should not have started successfully");
log.info("Verifying the internal topics for Connect");
connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());
// Verify that no workers are running
assertFalse(connect.anyWorkersRunning());
}
private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect,
long timeoutMillis) throws InterruptedException {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (!connect.anyWorkersRunning()) {
return false;
}
Thread.sleep(500); // wait for 500 milliseconds before checking again
}
return true;
} {code}
What changes do you suggest to improve this synchronous verification mechanism?
I'll create the PR accordingly.
> Synchronously verify Connect worker startup failure in
> InternalTopicsIntegrationTest
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-16943
> URL: https://issues.apache.org/jira/browse/KAFKA-16943
> Project: Kafka
> Issue Type: Improvement
> Components: connect
> Reporter: Chris Egerton
> Priority: Minor
> Labels: newbie
>
> Created after PR discussion
> [here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].
> In some of our integration tests, we want to verify that a Connect worker
> cannot start under poor conditions (such as when its internal topics do not
> yet exist and it is configured to create them with a higher replication
> factor than the number of available brokers, or when its internal topics
> already exist but they do not have the compaction cleanup policy).
> This is currently not possible, and presents a possible gap in testing
> coverage, especially for the test cases
> {{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and
> {{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we
> could have some way of synchronously awaiting the completion or failure of
> worker startup in our integration tests in order to guarantee that worker
> startup fails under sufficiently adverse conditions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)