junrao commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1878630009
########## core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala: ########## @@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest { batch )) } + + @Test + def testDeleteRecordsResponseContainsError(): Unit = { + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager + ) + + val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit]) + + // Response contains error. + when(replicaManager.deleteRecords( + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), + callbackCapture.capture(), + ArgumentMatchers.eq(true) + )).thenAnswer { _ => + callbackCapture.getValue.apply(Map( + new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult() + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code + ))) + } + + partitionRecordWriter.deleteRecords( + new TopicPartition("random-topic", 0), + 10L + ).whenComplete { (_, exp) => + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp) + } + + // Response does not contain topic queried. + when(replicaManager.deleteRecords( + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), + callbackCapture.capture(), + ArgumentMatchers.eq(true) + )).thenAnswer { _ => + callbackCapture.getValue.apply(Map( + new TopicPartition("other-random-topic", 0) -> new DeleteRecordsPartitionResult() + .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code) + )) + } + + partitionRecordWriter.deleteRecords( + new TopicPartition("random-topic", 0), + 10L + ).whenComplete { (_, exp) => + assertTrue(exp.isInstanceOf[IllegalStateException]) Review Comment: If a topic doesn't exist, we should get an unknown topic exception. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineLong; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Util class to track the offsets written into the internal topic + * per share partition key. + * It calculates the minimum offset globally up to which the records + * in the internal partition are redundant i.e. they have been overridden + * by newer records. + */ +public class ShareCoordinatorOffsetsManager { + + // Map to store share partition key => current partition offset + // being written. + private final TimelineHashMap<SharePartitionKey, Long> offsets; + + // Minimum offset representing the smallest necessary offset (non-redundant) + // across the internal partition. + // We are using timeline object here because the offsets which are passed into + // updateState might not be committed yet. In case of retry, these offsets would + // be invalidated via the snapshot registry. Hence, using timeline object + // the values would automatically revert in accordance with the last committed offset. + private final TimelineLong minOffset; + // Initially true because we don't want to send the offset + // until findRedundantOffset determines a redundant offset is + // indeed present. + private AtomicBoolean offsetExposed = new AtomicBoolean(true); + + public ShareCoordinatorOffsetsManager(SnapshotRegistry snapshotRegistry) { + Objects.requireNonNull(snapshotRegistry); + offsets = new TimelineHashMap<>(snapshotRegistry, 0); + minOffset = new TimelineLong(snapshotRegistry); + minOffset.set(Long.MAX_VALUE); // For easy min update. + } + + /** + * Method updates internal state with the supplied offset for the provided + * share partition key. It then calculates the minimum offset, if possible, + * below which all offsets are redundant. + * + * @param key - represents {@link SharePartitionKey} whose offset needs updating + * @param offset - represents the latest partition offset for provided key + */ + public void updateState(SharePartitionKey key, long offset) { + minOffset.set(Math.min(minOffset.get(), offset)); + offsets.put(key, offset); + + Optional<Long> deleteTillOffset = findRedundantOffset(); + deleteTillOffset.ifPresent(off -> { + minOffset.set(off); + offsetExposed.set(false); + }); + } + + private Optional<Long> findRedundantOffset() { + if (offsets.isEmpty()) { + return Optional.empty(); + } + + long soFar = Long.MAX_VALUE; + + for (long offset : offsets.values()) { + // Get min offset among latest offsets + // for all share keys in the internal partition. + soFar = Math.min(soFar, offset); + + // minOffset represents the smallest necessary offset + // and if soFar equals it, we cannot proceed. This can happen + // if a share partition key hasn't had records written for a while + // For example, + // <p> + // key1:1 + // key2:2 4 6 + // key3:3 5 7 + // <p> + // We can see in above that offsets 2, 4, 3, 5 are redundant, + // but we do not have a contiguous prefix starting at minOffset + // and we cannot proceed. + if (soFar == minOffset.get()) { + return Optional.empty(); + } + } + + return Optional.of(soFar); + } + + /** + * Most recent last redundant offset. This method is to be used + * when the caller wants to query the value of such offset. + * After returning the value once, the redundant offset is reset. + * @return Optional of type Long representing the offset or empty for invalid offset values + */ + public Optional<Long> lastRedundantOffset() { Review Comment: This method is ok, but is very customized for the usage in the current only caller. If there is another caller, it can unexpectedly break the existing caller. A better api is probably to always expose the lastRedundantOffset and let the caller handle the case when the same value is returned. ########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java: ########## @@ -274,6 +290,7 @@ public void testReadStateSuccess() throws ExecutionException, InterruptedExcepti ))) ); + Review Comment: extra new line ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -6660,6 +6661,86 @@ class ReplicaManagerTest { } } + @Test + def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = { + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager, + threadNamePrefix = Option(this.getClass.getName)) + + val tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) + rm.createPartition(tp) + + def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = { + assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code) + } + + // default internal topics delete disabled + rm.deleteRecords( + timeout = 0L, + Map[TopicPartition, Long](tp -> 10L), + responseCallback = callback + ) + } + + @Test + def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect) + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + + val partition = rm.createPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Review Comment: `becomeLeaderOrFollower` is only used in ZK based controller, which won't be supported in 4.0. We need to use the cod path for KRaft based controller. ########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.timeline.SnapshotRegistry; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ShareCoordinatorOffsetsManagerTest { + + private ShareCoordinatorOffsetsManager manager; + private static final SharePartitionKey KEY1 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 0); + private static final SharePartitionKey KEY2 = SharePartitionKey.getInstance("gs2", Uuid.randomUuid(), 0); + private static final SharePartitionKey KEY3 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 1); + private static final SharePartitionKey KEY4 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 7); + + @BeforeEach + public void setUp() { + manager = new ShareCoordinatorOffsetsManager(new SnapshotRegistry(new LogContext())); + } + + @Test + public void testUpdateStateAddsToInternalState() { + manager.updateState(KEY1, 0L); + assertEquals(Optional.empty(), manager.lastRedundantOffset()); + + manager.updateState(KEY1, 10L); + assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // [0-9] offsets are redundant. + + manager.updateState(KEY2, 15L); + assertEquals(Optional.empty(), manager.lastRedundantOffset()); // No update to last redundant after adding 15L so, empty. + + assertEquals(10L, manager.curState().get(KEY1)); + assertEquals(15L, manager.curState().get(KEY2)); + } + + private static class ShareOffsetTestHolder { + static class TestTuple { + final SharePartitionKey key; + final long offset; + final Optional<Long> expectedOffset; + + private TestTuple(SharePartitionKey key, long offset, Optional<Long> expectedOffset) { + this.key = key; + this.offset = offset; + this.expectedOffset = expectedOffset; + } + + static TestTuple instance(SharePartitionKey key, long offset, Optional<Long> expectedOffset) { + return new TestTuple(key, offset, expectedOffset); + } + } + + private final String testName; + private final List<TestTuple> tuples; + private final boolean shouldRun; + + ShareOffsetTestHolder(String testName, List<TestTuple> tuples) { + this(testName, tuples, true); + } + + ShareOffsetTestHolder(String testName, List<TestTuple> tuples, boolean shouldRun) { + this.testName = testName; + this.tuples = tuples; + this.shouldRun = shouldRun; + } + } + + static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() { + return Stream.of( + new ShareOffsetTestHolder( + "no redundant state single key", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()) + ) + ), + + new ShareOffsetTestHolder( + "no redundant state multiple keys", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.empty()) Review Comment: This test result is counter intuitive. I'd expect `lastRedundantOffset` for all three to be 10L since we should be able to truncate the log at offset 10. ########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.share.SharePartitionKey; +import org.apache.kafka.timeline.SnapshotRegistry; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ShareCoordinatorOffsetsManagerTest { + + private ShareCoordinatorOffsetsManager manager; + private static final SharePartitionKey KEY1 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 0); + private static final SharePartitionKey KEY2 = SharePartitionKey.getInstance("gs2", Uuid.randomUuid(), 0); + private static final SharePartitionKey KEY3 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 1); + private static final SharePartitionKey KEY4 = SharePartitionKey.getInstance("gs1", Uuid.randomUuid(), 7); + + @BeforeEach + public void setUp() { + manager = new ShareCoordinatorOffsetsManager(new SnapshotRegistry(new LogContext())); + } + + @Test + public void testUpdateStateAddsToInternalState() { + manager.updateState(KEY1, 0L); + assertEquals(Optional.empty(), manager.lastRedundantOffset()); + + manager.updateState(KEY1, 10L); + assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // [0-9] offsets are redundant. + + manager.updateState(KEY2, 15L); + assertEquals(Optional.empty(), manager.lastRedundantOffset()); // No update to last redundant after adding 15L so, empty. + + assertEquals(10L, manager.curState().get(KEY1)); + assertEquals(15L, manager.curState().get(KEY2)); + } + + private static class ShareOffsetTestHolder { + static class TestTuple { + final SharePartitionKey key; + final long offset; + final Optional<Long> expectedOffset; + + private TestTuple(SharePartitionKey key, long offset, Optional<Long> expectedOffset) { + this.key = key; + this.offset = offset; + this.expectedOffset = expectedOffset; + } + + static TestTuple instance(SharePartitionKey key, long offset, Optional<Long> expectedOffset) { + return new TestTuple(key, offset, expectedOffset); + } + } + + private final String testName; + private final List<TestTuple> tuples; + private final boolean shouldRun; + + ShareOffsetTestHolder(String testName, List<TestTuple> tuples) { + this(testName, tuples, true); + } + + ShareOffsetTestHolder(String testName, List<TestTuple> tuples, boolean shouldRun) { + this.testName = testName; + this.tuples = tuples; + this.shouldRun = shouldRun; + } + } + + static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() { + return Stream.of( + new ShareOffsetTestHolder( + "no redundant state single key", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()) + ) + ), + + new ShareOffsetTestHolder( + "no redundant state multiple keys", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.empty()) + ) + ) + ); + } + + static Stream<ShareOffsetTestHolder> generateRedundantStateCases() { + return Stream.of( + new ShareOffsetTestHolder( + "redundant state single key", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, Optional.of(11L)), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 15L, Optional.of(15L)) + ) + ), + + new ShareOffsetTestHolder( + "redundant state multiple keys", + // KEY1: 10 17 + // KEY2: 11 16 + // KEY3: 15 + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.empty()), // KEY2 11 redundant but should not be returned + ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(15L)) + ) + ) + ); + + } + + static Stream<ShareOffsetTestHolder> generateComplexCases() { + return Stream.of( + new ShareOffsetTestHolder( + "redundant state reverse key order", + // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1. + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, Optional.empty()), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, Optional.of(18L)) + ) + ), + + new ShareOffsetTestHolder( + "redundant state cold partition", Review Comment: What does cold partition mean? ########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java: ########## @@ -673,4 +706,348 @@ public void testPartitionFor() { // asCoordinatorKey does not discriminate on topic name. assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey()); } + + @Test + public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception { + CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + org.apache.kafka.server.util.MockTime time = new org.apache.kafka.server.util.MockTime(); Review Comment: Could we import MockTime? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org