jsancio commented on code in PR #19589: URL: https://github.com/apache/kafka/pull/19589#discussion_r2075811292
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3321,34 +3321,35 @@ private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && - quorumConfig.autoJoinEnable() && state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) { - var voters = partitionState.lastVoterSet(); + quorumConfig.autoJoin() && state.hasUpdateVoterPeriodExpired(currentTimeMs)) { + /* `followersAlwaysFlush` is true when the config `process.roles` contains "controller". + * We require both `followersAlwaysFlush` and `autoJoin` to be true because + * brokers should not be able to add themselves as a KRaft voter. + */ var localReplicaKey = quorum.localReplicaKeyOrThrow(); - final boolean resetAddRemoveVoterTimer; + final boolean resetUpdateVoterTimer; final long backoffMs; - - Optional<ReplicaKey> oldVoter = voters.getOldVoterForReplicaKey(localReplicaKey); - if (oldVoter.isPresent()) { - var sendResult = maybeSendRemoveVoterRequest(state, currentTimeMs, oldVoter.get()); - resetAddRemoveVoterTimer = sendResult.requestSent(); - backoffMs = sendResult.timeToWaitMs(); - } else if (voters.doesNotContainReplicaId(localReplicaKey)) { - var sendResult = maybeSendAddVoterRequest(state, currentTimeMs); - resetAddRemoveVoterTimer = sendResult.requestSent(); - backoffMs = sendResult.timeToWaitMs(); + var voters = partitionState.lastVoterSet(); + RequestSendResult sendResult; Review Comment: Let's make this final: ```java final RequestSendResult sendResult; ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2323,6 +2342,25 @@ private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest( ); } + private boolean handleRemoveVoterResponse( + RaftResponse.Inbound responseMetadata, + long currentTimeMs + ) { + RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data(); + + Errors error = Errors.forCode(data.errorCode()); + + if (error == Errors.NONE || error == Errors.UNSUPPORTED_VERSION || error == Errors.VOTER_NOT_FOUND) { + return true; + } else if (error == Errors.REQUEST_TIMED_OUT) { + FollowerState follower = quorum.followerStateOrThrow(); + follower.resetUpdateVoterPeriod(currentTimeMs); + return true; Review Comment: Same comment here. `UNSUPPORTED_VERSION` should be an unexpected error, no? It is not clear to me why there is a different handling between `NONE`, `VOTER_NOT_FOUND` and `REQUEST_TIMED_OUT`. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3274,6 +3320,38 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); + } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && Review Comment: I have been thinking about the name of the variable `followerAlwaysFlush`. Let's rename the configuration `followerAlwaysFlush` to `canBecomeVoter`. I think that better matches the original intent and the use here. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2280,6 +2280,25 @@ private boolean handleApiVersionsResponse( ); } + private boolean handleAddVoterResponse( + RaftResponse.Inbound responseMetadata, + long currentTimeMs + ) { + RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data(); Review Comment: This cast to `RemoveRaftVoterResponseData` yet the method is for `handleAddVoterResponse`. Do the tests pass for you? How is that possible with this cast? ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -1248,6 +1256,25 @@ RaftRequest.Outbound assertSentApiVersionsRequest() { return sentRequests.get(0); } + RaftRequest.Outbound assertSentAddVoterRequest( + ReplicaKey replicaKey, + Endpoints endpoints + ) { + List<RaftRequest.Outbound> sentRequests = channel.drainSentRequests(Optional.of(ApiKeys.ADD_RAFT_VOTER)); + assertEquals(1, sentRequests.size()); + + var request = sentRequests.get(0); + assertInstanceOf(AddRaftVoterRequestData.class, request.data()); + + var addRaftVoterRequestData = (AddRaftVoterRequestData) request.data(); + assertEquals(clusterId, addRaftVoterRequestData.clusterId()); + assertEquals(replicaKey.id(), addRaftVoterRequestData.voterId()); + assertEquals(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID), addRaftVoterRequestData.voterDirectoryId()); Review Comment: I think is safer and more correct: ```java assertEquals(replicaKey.directoryId().get(), addRaftVoterRequestData.voterDirectoryId()); ``` ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinIfConfigNotSet() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var observer = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(observer.id(), observer.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When controller.quorum.auto.join.enable is not set, the controller should not send add voter + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception { + var leader = replicaKey(randomReplicaId(), false); + var follower = replicaKey(leader.id() + 1, false); + var observer = replicaKey(follower.id() + 1, false); + int epoch = 1; + + var context = new RaftClientTestContext.Builder(observer.id(), Set.of(leader.id(), follower.id())) + .withKip853Rpc(false) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When using kraft.version == 0, the controller should not send add voter, even if the config is set + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + // Used to prevent fetch timer expiration when advancing time + private void completeFetch(RaftClientTestContext context, int epoch, int fetchingId) throws Exception { Review Comment: Should we move this to `RaftClientTestContext` is it possible. If you move this there, do you mind fixing the other places where I use a similar pattern? Maybe this method should be called `advanceTimeToUpdateVoterSetTimer` or something like that. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2280,6 +2280,25 @@ private boolean handleApiVersionsResponse( ); } + private boolean handleAddVoterResponse( + RaftResponse.Inbound responseMetadata, + long currentTimeMs + ) { + RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data(); + + Errors error = Errors.forCode(data.errorCode()); + + if (error == Errors.NONE || error == Errors.UNSUPPORTED_VERSION || error == Errors.DUPLICATE_VOTER) { + return true; + } else if (error == Errors.REQUEST_TIMED_OUT) { + FollowerState follower = quorum.followerStateOrThrow(); + follower.resetUpdateVoterPeriod(currentTimeMs); + return true; Review Comment: It is not obvious to me why this handling is different from the `NONE` and `DUPLICATE_VOTER` error. Please write a comment explaining why this resets the timer while `DUPLICATE_VOTER` and `NONE` doesn't reset the timer. Unless there is a specific reason for doing this, ideally we would keep the handling consistent between all of these errors. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); Review Comment: After this, I think we should check that add voter is sent after another `completeFetch`? ########## raft/src/main/java/org/apache/kafka/raft/RaftUtil.java: ########## @@ -63,6 +63,8 @@ public static ApiMessage errorResponse(ApiKeys apiKey, Errors error) { case FETCH_SNAPSHOT -> new FetchSnapshotResponseData().setErrorCode(error.code()); case API_VERSIONS -> new ApiVersionsResponseData().setErrorCode(error.code()); case UPDATE_RAFT_VOTER -> new UpdateRaftVoterResponseData().setErrorCode(error.code()); + case ADD_RAFT_VOTER -> new AddRaftVoterResponseData().setErrorCode(error.code()); + case REMOVE_RAFT_VOTER -> new RemoveRaftVoterResponseData().setErrorCode(error.code()); Review Comment: Please update the test in `RaftUtilTest#testErrorResponse`. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3274,6 +3320,38 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); + } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && + quorumConfig.autoJoin() && state.hasUpdateVoterPeriodExpired(currentTimeMs)) { + /* `followersAlwaysFlush` is true when the config `process.roles` contains "controller". + * We require both `followersAlwaysFlush` and `autoJoin` to be true because + * brokers should not be able to add themselves as a KRaft voter. + */ Review Comment: Let's move any reference to "process.roles", "controller" and "broker". We don't always succeed but we should try to make KRaft agnostic of cluster metadata's use case. How about: ```java /* Only replicas that are always flushing and are configured to auto join should * attempt to automatically join the voter set for the configured topic partition. */ ``` ########## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ########## @@ -40,10 +40,8 @@ public class FollowerState implements EpochState { private final Set<Integer> voters; // Used for tracking the expiration of both the Fetch and FetchSnapshot requests private final Timer fetchTimer; - // Used to track when to send another update voter request + // Used to track when to send another add, remove, or update voter request private final Timer updateVoterPeriodTimer; Review Comment: Subtle but maybe `updateVoterSetPeriodTimer` is a better now since it is used to update a voter, remove a voter or add a voter. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2280,6 +2280,25 @@ private boolean handleApiVersionsResponse( ); } + private boolean handleAddVoterResponse( + RaftResponse.Inbound responseMetadata, + long currentTimeMs + ) { + RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data(); + + Errors error = Errors.forCode(data.errorCode()); + + if (error == Errors.NONE || error == Errors.UNSUPPORTED_VERSION || error == Errors.DUPLICATE_VOTER) { Review Comment: Why does `UNSOPPORTED_VERSION` return `true`? Unsupported version should be an unexpected error. When you do you expect this error? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3274,6 +3320,38 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); + } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && + quorumConfig.autoJoin() && state.hasUpdateVoterPeriodExpired(currentTimeMs)) { + /* `followersAlwaysFlush` is true when the config `process.roles` contains "controller". + * We require both `followersAlwaysFlush` and `autoJoin` to be true because + * brokers should not be able to add themselves as a KRaft voter. + */ + var localReplicaKey = quorum.localReplicaKeyOrThrow(); + final boolean resetUpdateVoterTimer; + final long backoffMs; + var voters = partitionState.lastVoterSet(); + RequestSendResult sendResult; + if (voters.voterIds().contains(localReplicaKey.id())) { + /* Replica id is in the voter set but replica is not voter. Remove old voter. + * Local replica is not in the voter set because the replica is an observer. + */ + var oldVoter = voters.voterKeys().stream().filter(replicaKey -> replicaKey.id() == localReplicaKey.id()).findFirst().get(); Review Comment: This line is too long. ```java var oldVoter = voters.voterKeys() .stream() .filter(replicaKey -> replicaKey.id() == localReplicaKey.id()) .findFirst() .get(); ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3274,6 +3320,38 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); + } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && + quorumConfig.autoJoin() && state.hasUpdateVoterPeriodExpired(currentTimeMs)) { + /* `followersAlwaysFlush` is true when the config `process.roles` contains "controller". + * We require both `followersAlwaysFlush` and `autoJoin` to be true because + * brokers should not be able to add themselves as a KRaft voter. + */ + var localReplicaKey = quorum.localReplicaKeyOrThrow(); + final boolean resetUpdateVoterTimer; + final long backoffMs; + var voters = partitionState.lastVoterSet(); + RequestSendResult sendResult; + if (voters.voterIds().contains(localReplicaKey.id())) { + /* Replica id is in the voter set but replica is not voter. Remove old voter. + * Local replica is not in the voter set because the replica is an observer. + */ + var oldVoter = voters.voterKeys().stream().filter(replicaKey -> replicaKey.id() == localReplicaKey.id()).findFirst().get(); + sendResult = maybeSendRemoveVoterRequest(state, currentTimeMs, oldVoter); + } else { + sendResult = maybeSendAddVoterRequest(state, currentTimeMs); + } + resetUpdateVoterTimer = sendResult.requestSent(); + backoffMs = sendResult.timeToWaitMs(); + if (resetUpdateVoterTimer) { Review Comment: Looks like you can just remove this variable and just do: ```java if (sendResult.requestSent()) { ``` ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request Review Comment: Let's not reference "controller". If you implement my suggestion to use `canBecomeVoter` instead than this is easier to document without having to mention "controller" or "broker". ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); Review Comment: Why are you using `Uuid.ONE_UUID` for the directory id? Why not use a random uuid? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3274,6 +3320,38 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); + } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && + quorumConfig.autoJoin() && state.hasUpdateVoterPeriodExpired(currentTimeMs)) { + /* `followersAlwaysFlush` is true when the config `process.roles` contains "controller". + * We require both `followersAlwaysFlush` and `autoJoin` to be true because + * brokers should not be able to add themselves as a KRaft voter. + */ + var localReplicaKey = quorum.localReplicaKeyOrThrow(); + final boolean resetUpdateVoterTimer; + final long backoffMs; + var voters = partitionState.lastVoterSet(); + RequestSendResult sendResult; + if (voters.voterIds().contains(localReplicaKey.id())) { + /* Replica id is in the voter set but replica is not voter. Remove old voter. + * Local replica is not in the voter set because the replica is an observer. + */ + var oldVoter = voters.voterKeys().stream().filter(replicaKey -> replicaKey.id() == localReplicaKey.id()).findFirst().get(); + sendResult = maybeSendRemoveVoterRequest(state, currentTimeMs, oldVoter); + } else { + sendResult = maybeSendAddVoterRequest(state, currentTimeMs); + } + resetUpdateVoterTimer = sendResult.requestSent(); + backoffMs = sendResult.timeToWaitMs(); + if (resetUpdateVoterTimer) { + state.resetUpdateVoterPeriod(currentTimeMs); + } + return Math.min( + backoffMs, Review Comment: Looks like you can remove this variable and just do: ```java sendResult.timeToWaitMs(), ``` ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -181,6 +181,8 @@ public static final class Builder { private Endpoints localListeners = Endpoints.empty(); private boolean isStartingVotersStatic = false; Review Comment: Remove extra space. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) Review Comment: Same as my previous comment. These are deprecated. Please use the new methods. This comment applies to a few more changes. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) Review Comment: `withKip853Rpc` is deprecated. Please use `withRaftProtocol`. `withBootstrapSnapshot` is also deprecated. Please use `withStartingVoters`. ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -1261,6 +1288,23 @@ AddRaftVoterResponseData assertSentAddVoterResponse(Errors error) { return addVoterResponse; } + RaftRequest.Outbound assertSentRemoveVoterRequest( + ReplicaKey replicaKey + ) { + List<RaftRequest.Outbound> sentRequests = channel.drainSentRequests(Optional.of(ApiKeys.REMOVE_RAFT_VOTER)); + assertEquals(1, sentRequests.size()); + + var request = sentRequests.get(0); + assertInstanceOf(RemoveRaftVoterRequestData.class, request.data()); + + var removeRaftVoterRequestData = (RemoveRaftVoterRequestData) request.data(); + assertEquals(clusterId, removeRaftVoterRequestData.clusterId()); + assertEquals(replicaKey.id(), removeRaftVoterRequestData.voterId()); + assertEquals(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID), removeRaftVoterRequestData.voterDirectoryId()); Review Comment: Same here. I think this is safe and more correct: ```java assertEquals(replicaKey.directoryId().get(), removeRaftVoterRequestData.voterDirectoryId()); ``` ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinIfConfigNotSet() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var observer = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(observer.id(), observer.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When controller.quorum.auto.join.enable is not set, the controller should not send add voter Review Comment: Same here. Please remove the reference to the property and "controller." ########## raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java: ########## @@ -99,6 +100,11 @@ public class QuorumConfig { public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20; + public static final String QUORUM_AUTO_JOIN_ENABLE = QUORUM_PREFIX + "auto.join.enable"; + public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether a KRaft controller should automatically " + + "join the cluster metadata partition for its cluster id."; + public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false; Review Comment: `KafkaConfig` should validate that this `true` only when `process.role` contains `controller`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinIfConfigNotSet() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var observer = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(observer.id(), observer.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When controller.quorum.auto.join.enable is not set, the controller should not send add voter + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception { + var leader = replicaKey(randomReplicaId(), false); + var follower = replicaKey(leader.id() + 1, false); + var observer = replicaKey(follower.id() + 1, false); Review Comment: Let's generate directory ids for all of this replicas. You can use `withStartingVoters(.., KRaftVersion.KRAFT_VERSION_0)` to make it static voters and kraft version 0. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { Review Comment: Same here: `testObserversDoNotAutoJoin`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinIfConfigNotSet() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var observer = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(observer.id(), observer.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When controller.quorum.auto.join.enable is not set, the controller should not send add voter + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception { + var leader = replicaKey(randomReplicaId(), false); + var follower = replicaKey(leader.id() + 1, false); + var observer = replicaKey(follower.id() + 1, false); + int epoch = 1; + + var context = new RaftClientTestContext.Builder(observer.id(), Set.of(leader.id(), follower.id())) + .withKip853Rpc(false) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When using kraft.version == 0, the controller should not send add voter, even if the config is set + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + // Used to prevent fetch timer expiration when advancing time + private void completeFetch(RaftClientTestContext context, int epoch, int fetchingId) throws Exception { + // waiting for FETCH requests until the AddVoter request is sent + for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD; i++) { + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest.correlationId(), + fetchRequest.destination(), + context.fetchResponse( + epoch, + fetchingId, Review Comment: Is this correct? Based on the code this is the leader id. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { Review Comment: Do we need a test that fully does a remove follow by an add? E.g. 0. Start with the local replica not in the voter set but the id in the voter set. 1. Remove voter is sent and acknowledged. 2. Next FETCH response send the VOTER_RECORD control batch without the voter in the old voter in the voter set. 3. Add voter is sent and acknowledged. 4. Next FETCH response send a VOTER_RECORD control btach with the local replica in the voter set. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinIfConfigNotSet() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var observer = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(observer.id(), observer.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When controller.quorum.auto.join.enable is not set, the controller should not send add voter + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception { + var leader = replicaKey(randomReplicaId(), false); + var follower = replicaKey(leader.id() + 1, false); + var observer = replicaKey(follower.id() + 1, false); + int epoch = 1; + + var context = new RaftClientTestContext.Builder(observer.id(), Set.of(leader.id(), follower.id())) Review Comment: This is a deprecated constructor. Use `Builder(int, Uuid)` and `withStartingVoters`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinIfConfigNotSet() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var observer = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(observer.id(), observer.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When controller.quorum.auto.join.enable is not set, the controller should not send add voter + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception { + var leader = replicaKey(randomReplicaId(), false); + var follower = replicaKey(leader.id() + 1, false); + var observer = replicaKey(follower.id() + 1, false); + int epoch = 1; + + var context = new RaftClientTestContext.Builder(observer.id(), Set.of(leader.id(), follower.id())) + .withKip853Rpc(false) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When using kraft.version == 0, the controller should not send add voter, even if the config is set + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + // Used to prevent fetch timer expiration when advancing time + private void completeFetch(RaftClientTestContext context, int epoch, int fetchingId) throws Exception { Review Comment: This line looks too long. ```java private void completeFetch( RaftClientTestContext context, int epoch, int fetchingId ) throws Exception { ``` ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newBroker.id(), newBroker.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(false) + .build(); + + completeFetch(context, epoch, newBroker.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When alwaysFlush == false, the client is not for a controller process, so it should not send an add voter request + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinIfConfigNotSet() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var observer = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(observer.id(), observer.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(false) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When controller.quorum.auto.join.enable is not set, the controller should not send add voter + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception { + var leader = replicaKey(randomReplicaId(), false); + var follower = replicaKey(leader.id() + 1, false); + var observer = replicaKey(follower.id() + 1, false); + int epoch = 1; + + var context = new RaftClientTestContext.Builder(observer.id(), Set.of(leader.id(), follower.id())) + .withKip853Rpc(false) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, observer.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + + // When using kraft.version == 0, the controller should not send add voter, even if the config is set Review Comment: This line looks too long. Please try to make sure that lines in the `raft` module are less than 100 characters. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); Review Comment: After this, I think we should check that remove voter is sent after `completeFetch`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; + +public class KafkaRaftClientAutoJoinTest { + private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1; + + @Test + public void testAutoRemoveOldVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var oldFollower = replicaKey(leader.id() + 1, true); + var newFollowerKey = ReplicaKey.of(oldFollower.id(), Uuid.ONE_UUID); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, oldFollower)); + + var context = new RaftClientTestContext.Builder(newFollowerKey.id(), newFollowerKey.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newFollowerKey.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var removeRequest = context.assertSentRemoveVoterRequest(oldFollower); + context.deliverResponse( + removeRequest.correlationId(), + removeRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending a remove voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testAutoAddNewVoter() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newVoter = replicaKey(follower.id() + 1, true); + int epoch = 1; + + var voters = VoterSetTest.voterSet(Stream.of(leader, follower)); + + var context = new RaftClientTestContext.Builder(newVoter.id(), newVoter.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, leader.id()) + .withAutoJoin(true) + .withAlwaysFlush(true) + .build(); + + completeFetch(context, epoch, newVoter.id()); + + context.time.sleep(context.fetchTimeoutMs - 1); + context.pollUntilRequest(); + var addRequest = context.assertSentAddVoterRequest(newVoter, context.client.quorum().localVoterNodeOrThrow().listeners()); + context.deliverResponse( + addRequest.correlationId(), + addRequest.destination(), + RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message()) + ); + + // after sending an add voter the next request should be a fetch + context.pollUntilRequest(); + var fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + } + + @Test + public void testBrokersDoNotAutoJoin() throws Exception { + var leader = replicaKey(randomReplicaId(), true); + var follower = replicaKey(leader.id() + 1, true); + var newBroker = replicaKey(follower.id() + 1, true); Review Comment: KRaft protocol doesn't have "brokers." It has "observers" instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org