smjn commented on code in PR #17149: URL: https://github.com/apache/kafka/pull/17149#discussion_r1776452137
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java: ########## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.server.share.PersisterStateBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; + +public class StateBatchUtil { + /** + * Util method which takes in 2 lists containing {@link PersisterStateBatch} + * and the startOffset. + * This method removes any batches where the lastOffset < startOffset, if the startOffset > -1. + * It then merges any contiguous intervals with same state. If states differ, + * based on various conditions it creates new non-overlapping batches preferring new ones. + * @param batchesSoFar - List containing current soft state of {@link PersisterStateBatch} + * @param newBatches - List containing {@link PersisterStateBatch} in incoming request + * @param startOffset - startOffset to consider when removing old batches. + * @return List containing combined batches + */ + public static List<PersisterStateBatch> combineStateBatches( + List<PersisterStateBatch> batchesSoFar, + List<PersisterStateBatch> newBatches, + long startOffset + ) { + List<PersisterStateBatch> combinedList = new ArrayList<>(batchesSoFar.size() + newBatches.size()); + combinedList.addAll(batchesSoFar); + combinedList.addAll(newBatches); + + return mergeBatches( + pruneBatches( + combinedList, + startOffset + ) + ); + } + + /** + * Encapsulates the main merge algorithm. Consider 2 batches (A, B): + * - Same state (delivery count and state) + * - If overlapping - merge into single batch + * - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches into a single 1 + * - Different state (delivery count or state differ) + * - Based on various cases: + * - swallow lower priority batch within bounds of offsets + * - break batch into other non-overlapping batches + * @param batches - List of {@link PersisterStateBatch} + * @return List of non-overlapping {@link PersisterStateBatch} + */ + private static List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> batches) { + if (batches.size() < 2) { + return batches; + } + TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches); + List<PersisterStateBatch> finalBatches = new ArrayList<>(batches.size() * 2); // heuristic size + + BatchOverlapState overlapState = getOverlappingState(sortedBatches); + + while (overlapState != BatchOverlapState.SENTINEL) { Review Comment: No the logic is not that simple. A single iteration over the sorted batches might not be enough. The invariant is that the batches remain sorted even after manipulation Consider: ``` -------- A [1,10,0,1] --- B [5,7,0,2] -------------- C [5,15,0,3] ``` A and B will combine to ``` ---- [1,4,0,1] ---- [5,7,0,2] --- [8,10,0,1] ``` Now when combining with C, we have 2 previous batches to consider. Secondly, ``` -------- A [1,10,0,1] --- B [5,7,0,2] --- C [5,7,0,3] ``` A and B will combine to ``` ---- [1,4,0,1] ---- [5,7,0,2] --- [8,10,0,1] --- <- C - we broke invariant for being sorted by batches ``` In the current impl, these situations are implicitly handled by virtue of the treeset. Any newly generated batches are pushed back into the treeset and the `getOverlappingState` method finds the first overlapping pair as well as returns the non-overlapping prefix. The non-overlapping prefix is then REMOVED from the treeset hence, once a batch is no longer overlapping, it is only looked at once guaranteeing running time efficiency. @AndrewJSchofield @mumrah -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
