smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776452137


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+    /**
+     * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+     * and the startOffset.
+     * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+     * It then merges any contiguous intervals with same state. If states 
differ,
+     * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+     * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+     * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+     * @param startOffset - startOffset to consider when removing old batches.
+     * @return List containing combined batches
+     */
+    public static List<PersisterStateBatch> combineStateBatches(
+        List<PersisterStateBatch> batchesSoFar,
+        List<PersisterStateBatch> newBatches,
+        long startOffset
+    ) {
+        List<PersisterStateBatch> combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+        combinedList.addAll(batchesSoFar);
+        combinedList.addAll(newBatches);
+
+        return mergeBatches(
+            pruneBatches(
+                combinedList,
+                startOffset
+            )
+        );
+    }
+
+    /**
+     * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+     * - Same state (delivery count and state)
+     *  - If overlapping - merge into single batch
+     *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+     * - Different state (delivery count or state differ)
+     *  - Based on various cases:
+     *      - swallow lower priority batch within bounds of offsets
+     *      - break batch into other non-overlapping batches
+     * @param batches - List of {@link PersisterStateBatch}
+     * @return List of non-overlapping {@link PersisterStateBatch}
+     */
+    private static List<PersisterStateBatch> 
mergeBatches(List<PersisterStateBatch> batches) {
+        if (batches.size() < 2) {
+            return batches;
+        }
+        TreeSet<PersisterStateBatch> sortedBatches = new TreeSet<>(batches);
+        List<PersisterStateBatch> finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+        BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+        while (overlapState != BatchOverlapState.SENTINEL) {

Review Comment:
   No the logic is not that simple. A single iteration over the sorted batches 
might not be enough.
   
   Consider:
   ```
   --------                    A [1,10,0,1]
        ---                     B [5,7,0,2]
        --------------       C [5,15,0,3]
   ```
   A and B will combine to
   ```
   ----             [1,4,0,1]
        ----        [5,7,0,2]
             ---    [8,10,0,1]
   ```
   Now when combining with C, we have 2 previous batches to consider.
   
   Secondly,
   ```
   --------                    A [1,10,0,1]
        ---                     B [5,7,0,2]
        ---                     C [5,7,0,3]
   ```
   A and B will combine to
   ```
   ----             [1,4,0,1]
        ----        [5,7,0,2]
             ---    [8,10,0,1]
        ---         <- C - we broke invariant for being sorted by batches 
   ```
   
   In the current impl, these situations are implicitly handled by virtue of 
the treeset. Any newly generated batches are pushed back into the treeset and 
the `getOverlappingState` method finds the first overlapping pair as well as 
returns the non-overlapping prefix. 
   The non-overlapping prefix is then REMOVED from the treeset hence, once a 
batch is no longer overlapping, it is only looked at once guaranteeing running 
time efficiency.
   
   @AndrewJSchofield 
   @mumrah 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to