rondagostino opened a new pull request #10093:
URL: https://github.com/apache/kafka/pull/10093
We need to be able to run system tests with Raft-based metadata quorums --
both co-located brokers and controllers as well as remote controllers -- in
addition to the ZooKepeer-based mode we run today. This PR adds this
capability to `KafkaService` in a backwards-compatible manner as follows.
If no changes are made to existing system tests then they function as they
always do -- they instantiate ZooKeeper, and Kafka will use ZooKeeper. A good
test of this PR is therefore to run a full system test suite with no actual
test changes and make sure everything runs as expected.
If we want to use a Raft-based metadata quorum we can do so by introducing a
`metadata_quorum` argument to the test method and using `@matrix` to set it to
the quorums we want to use for the various runs of the test. We then also have
to skip creating a `ZooKeeperService` when the quorum is Raft-based.
For example, we would do the following:
```
from ducktape.mark import matrix
from kafkatest.services.kafka import KafkaService, quorum
```
```
def __init__(self, test_context):
super(TestVerifiableProducer, self).__init__(test_context)
self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
topics={"topic": {"partitions": 1,
"replication-factor": 1}})
```
```
def setUp(self):
if self.zk:
self.zk.start()
self.kafka.start()
```
```
@cluster(num_nodes=3)
@matrix(producer_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all)
def test_simple_run(self, producer_version=DEV_BRANCH,
metadata_quorum=quorum.zk):
# the rest of the test logic remains unchanged
```
The above will end up running 3 separate tests: one with ZooKeeper, one with
a co-located Raft-based controller, and once with a remote Raft-based
controller.
If we want to set security protocols we could do this:
```
def setUp(self):
if self.zk:
self.zk.start()
# don't start Kafka here because we haven't configured security at
this point
```
```
@cluster(num_nodes=3)
@matrix(producer_version=[str(DEV_BRANCH)],
security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
@cluster(num_nodes=4)
@matrix(producer_version=[str(DEV_BRANCH)],
security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'],
metadata_quorum=quorum.all)
def test_simple_run(self, producer_version, security_protocol =
'PLAINTEXT', sasl_mechanism='PLAIN',
metadata_quorum=quorum.zk):
self.kafka.security_protocol = security_protocol
self.kafka.client_sasl_mechanism = sasl_mechanism
self.kafka.interbroker_security_protocol = security_protocol
self.kafka.interbroker_sasl_mechanism = sasl_mechanism
if self.kafka.quorum_info.using_raft:
controller_quorum = self.kafka.controller_quorum
controller_quorum.controller_security_protocol =
security_protocol
controller_quorum.controller_sasl_mechanism = sasl_mechanism
controller_quorum.intercontroller_security_protocol =
security_protocol
controller_quorum.intercontroller_sasl_mechanism = sasl_mechanism
# now we can start Kafka
self.kafka.start()
# the rest of the test logic remains unchanged
```
This PR does not update any tests -- those will come later after all the
KIP-500 code is merged.
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]