Shekharrajak commented on code in PR #22572:
URL: https://github.com/apache/kafka/pull/22572#discussion_r3412305517


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombiner.java:
##########
@@ -20,385 +20,170 @@
 import org.apache.kafka.server.share.persister.PersisterStateBatch;
 
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Comparator;
 import java.util.List;
-import java.util.Objects;
-import java.util.TreeSet;
+import java.util.TreeMap;
 
+/**
+ * Combines an existing list of {@link PersisterStateBatch} entries with a 
list of newly produced
+ * entries and returns the shortest non-overlapping, {@code (deliveryState, 
deliveryCount)}-distinct
+ * cover of the union, clipped at {@code startOffset} (SPSO).
+ *
+ * <p>The merge is performed by an event-driven sweep-line over the union of 
both inputs. Each
+ * input batch contributes one BEGIN event at {@code firstOffset} and one END 
event at
+ * {@code lastOffset + 1}. Events are processed in offset order (END before 
BEGIN at the same
+ * offset). A counted ordered map tracks the currently active priorities; the 
first key defines the
+ * {@code (state, count)} that wins on the current sub-range. Successive 
sub-ranges with identical
+ * {@code (state, count)} are coalesced on the fly.
+ *
+ * <p>Complexity: {@code O((n + k) log p)} where {@code n} is the total number 
of input batches,
+ * {@code k} is the number of overlap transitions encountered, and {@code p} 
is the number of
+ * distinct {@code (deliveryState, deliveryCount)} priorities.
+ */
 public class PersisterStateBatchCombiner {
-    private List<PersisterStateBatch> combinedBatchList;    // link between 
pruning and merging
+    private static final Comparator<BatchPriority> PRIORITY_DESC = (a, b) -> {
+        int cmpCount = Short.compare(b.deliveryCount(), a.deliveryCount());
+        if (cmpCount != 0) {
+            return cmpCount;
+        }
+        return Byte.compare(b.deliveryState(), a.deliveryState());
+    };
+
+    private final List<PersisterStateBatch> batchesSoFar;
+    private final List<PersisterStateBatch> newBatches;
     private final long startOffset;
-    private TreeSet<PersisterStateBatch> sortedBatches;
-    private List<PersisterStateBatch> finalBatchList;   // final list is built 
here
-    private final List<PersisterStateBatch> nonOverlappingBuffer = new 
ArrayList<>();   // reused per findMergeCandidatePair call
 
     public PersisterStateBatchCombiner(
         List<PersisterStateBatch> batchesSoFar,
         List<PersisterStateBatch> newBatches,
         long startOffset
     ) {
-        initializeCombinedList(batchesSoFar, newBatches);
-        int estimatedResultSize = (combinedBatchList.size() * 3) / 2;   // 
heuristic size - 50% overallocation
-        finalBatchList = new ArrayList<>(estimatedResultSize);
+        this.batchesSoFar = batchesSoFar == null ? List.of() : batchesSoFar;
+        this.newBatches = newBatches == null ? List.of() : newBatches;
         this.startOffset = startOffset;
     }
 
-    private void initializeCombinedList(List<PersisterStateBatch> 
batchesSoFar, List<PersisterStateBatch> newBatches) {
-        boolean soFarEmpty = batchesSoFar == null || batchesSoFar.isEmpty();
-        boolean newBatchesEmpty = newBatches == null || newBatches.isEmpty();
-
-        if (soFarEmpty && newBatchesEmpty) {
-            combinedBatchList = new ArrayList<>();
-        } else if (soFarEmpty) {
-            combinedBatchList = new ArrayList<>(newBatches);    // new list as 
the original one could be unmodifiable
-        } else if (newBatchesEmpty) {
-            combinedBatchList = new ArrayList<>(batchesSoFar);  // new list as 
the original one could be unmodifiable
-        } else {
-            combinedBatchList = new ArrayList<>(batchesSoFar.size() + 
newBatches.size());
-            combinedBatchList.addAll(batchesSoFar);
-            combinedBatchList.addAll(newBatches);
-        }
-    }
-
     /**
-     * Algorithm: Merge current state batches and new batches into a single 
non-overlapping batch list.
-     * Input: batchesSoFar, newBatches, startOffset
-     * Output: combined list with non-overlapping batches (finalBatchList)
-     * <p>
-     * - Add both currentBatches and newBatches into a single list 
combinedBatchList
-     * - if combinedBatchList.size() <= 1 return combinedBatchList
-     * <p>
-     * - Remove/prune any batches from the combinedBatchList:
-     * -    if batch.lastOffset < startOffset then remove batch from 
combinedBatchList
-     * -    else if batch.firstOffset > startOffset then we will keep the batch
-     * -    else if batch.firstOffset <= startOffset <= batch.lastOffset then 
keep [startOffset, batch.lastOffset] part only and discard rest.
-     * <p>
-     * - create a treeset sortedBatches using pruned combinedBatchList
-     * - find first 2 mergeable batches in sortedBatches set, say, prev and 
candidate.
-     * - remove any non-overlapping batches from sortedBatches encountered 
during the find operation and add them to a finalBatchList
-     * - do repeat until a mergeable pair is not found:
-     * -    based on various conditions of offset overlap and batch state 
differences combine the batches or
-     *      create new batches, if required, and add to the sortedBatches.
-     * -    find first 2 mergeable batches in sortedBatches set, say, prev and 
candidate.
-     * -    remove any non-mergeable batches from sortedBatches encountered 
during the find operation and add them to a finalBatchList
-     * - done
-     * - return the finalBatchList
-     *
-     * @return list of {@link PersisterStateBatch} representing 
non-overlapping combined batches
+     * Produces the merged, pruned, non-overlapping batch list.
      */
     public List<PersisterStateBatch> combineStateBatches() {
-        pruneBatches();
-        mergeBatches();
-        return finalBatchList;
-    }
-
-    private void mergeBatches() {
-        if (combinedBatchList.size() < 2) {
-            finalBatchList = combinedBatchList;
-            return;
-        }
-
-        sortedBatches = new TreeSet<>(combinedBatchList);
-
-        MergeCandidatePair overlapState = getMergeCandidatePair();
-
-        while (overlapState != MergeCandidatePair.EMPTY) {
-            PersisterStateBatch prev = overlapState.prev();
-            PersisterStateBatch candidate = overlapState.candidate();
-
-            // remove both previous and candidate for easier
-            // assessment about adding batches to sortedBatches
-            sortedBatches.remove(prev);
-            sortedBatches.remove(candidate);
-
-            int cmp = compareBatchDeliveryInfo(candidate, prev);
-            if (cmp == 0) {  // same state and overlap or contiguous
-                // overlap and same state (prev.firstOffset <= 
candidate.firstOffset) due to sort
-                // covers:
-                // case:        1        2          3            4          5  
         6          7 (contiguous)
-                // prev:        ------   -------    -------      -------   
-------   --------    -------
-                // candidate:   ------   ----       ----------     ---        
----       -------        -------
-                handleSameStateMerge(prev, candidate);  // pair can be 
contiguous or overlapping
-            } else {
-                // If we reach here then it is guaranteed that the batch pair 
is overlapping and
-                // non-contiguous because getMergeCandidatePair only returns 
contiguous pair if
-                // the constituents have the same delivery count and state.
-                // covers:
-                // case:        1        2*          3            4          5 
          6             7*
-                // prev:        ------   -------    -------      -------    
-------     --------      ------
-                // candidate:   ------   ----       ---------      ----        
----        -------   -------
-                // max batches: 1           2       2                3         
 2            2          2
-                // min batches: 1           1       1                1         
 1            2          1
-                // * not possible with treeset
-                handleDiffStateOverlap(prev, candidate);
-            }
-            overlapState = getMergeCandidatePair();
+        List<PersisterStateBatch> pruned = prune();
+        if (pruned.isEmpty()) {
+            return pruned;
         }
-        finalBatchList.addAll(sortedBatches);   // some non overlapping 
batches might have remained
-    }
-
-    /**
-     * Compares the non-offset state of 2 batches i.e. the deliveryCount and 
deliverState.
-     * <p>
-     * Uses standard compareTo contract x < y => +int, x > y => -int, x == y 
=> 0
-     *
-     * @param b1 - {@link PersisterStateBatch} to compare
-     * @param b2 - {@link PersisterStateBatch} to compare
-     * @return int representing comparison result.
-     */
-    private int compareBatchDeliveryInfo(PersisterStateBatch b1, 
PersisterStateBatch b2) {
-        int deltaCount = Short.compare(b1.deliveryCount(), b2.deliveryCount());
-
-        // Delivery state could be:
-        // 0 - AVAILABLE (non-terminal)
-        // 1 - ACQUIRED - should not be persisted yet
-        // 2 - ACKNOWLEDGED (terminal)
-        // 3 - ARCHIVING - not implemented in KIP-932 - non-terminal - leads 
only to ARCHIVED
-        // 4 - ARCHIVED (terminal)
-
-        if (deltaCount == 0) {   // same delivery count
-            return Byte.compare(b1.deliveryState(), b2.deliveryState());
+        if (pruned.size() == 1) {
+            return pruned;
         }
-        return deltaCount;
+        return sweepMerge(pruned);
     }
 
     /**
-     * Accepts a sorted set of state batches and finds the first 2 batches 
which can be merged.
-     * Merged implies that they have some offsets in common or, they are 
contiguous with the same state.
-     * <p>
-     * Any non-mergeable batches prefixing a good mergeable pair are removed 
from the sortedBatches.
-     * For example:
-     * ----- ----  ----- -----      -----
-     *                      ------
-     * <---------------> <-------->
-     * non-overlapping   1st overlapping pair
-     *
-     * @return object representing the overlap state
+     * Drops or clips ranges below {@code startOffset}. Returns a new list; 
never modifies inputs.
      */
-    private MergeCandidatePair getMergeCandidatePair() {
-        if (sortedBatches == null || sortedBatches.isEmpty()) {
-            return MergeCandidatePair.EMPTY;
-        }
-        Iterator<PersisterStateBatch> iter = sortedBatches.iterator();
-        PersisterStateBatch prev = iter.next();
-        nonOverlappingBuffer.clear();
-        while (iter.hasNext()) {
-            PersisterStateBatch candidate = iter.next();
-            if (candidate.firstOffset() <= prev.lastOffset() || // overlap
-                prev.lastOffset() + 1 == candidate.firstOffset() && 
compareBatchDeliveryInfo(prev, candidate) == 0) {  // contiguous
-                updateBatchContainers(nonOverlappingBuffer);
-                return new MergeCandidatePair(
-                    prev,
-                    candidate
-                );
+    private List<PersisterStateBatch> prune() {
+        int estimate = batchesSoFar.size() + newBatches.size();
+        List<PersisterStateBatch> out = new ArrayList<>(estimate);
+        addPruned(out, batchesSoFar);
+        addPruned(out, newBatches);
+        return out;
+    }
+
+    private void addPruned(List<PersisterStateBatch> out, 
List<PersisterStateBatch> src) {
+        for (PersisterStateBatch b : src) {
+            if (startOffset != -1 && b.lastOffset() < startOffset) {
+                // batch fully expired
+                continue;
+            }
+            if (startOffset == -1 || b.firstOffset() >= startOffset) {
+                out.add(b);
+            } else {
+                // start offset intersects batch -> clip
+                out.add(new PersisterStateBatch(startOffset, b.lastOffset(), 
b.deliveryState(), b.deliveryCount()));
             }
-            nonOverlappingBuffer.add(prev);
-            prev = candidate;
         }
-
-        updateBatchContainers(nonOverlappingBuffer);
-        return MergeCandidatePair.EMPTY;
-    }
-
-    private void updateBatchContainers(List<PersisterStateBatch> 
nonOverlappingBatches) {
-        sortedBatches.removeAll(nonOverlappingBatches);
-        finalBatchList.addAll(nonOverlappingBatches);
     }
 
     /**
-     * Accepts a list of {@link PersisterStateBatch} and checks:
-     * - last offset is < start offset => batch is removed
-     * - first offset > start offset => batch is preserved
-     * - start offset intersects the batch => part of batch before start 
offset is removed and
-     * the part after it is preserved.
+     * Event-driven sweep. Linear time after the initial event sort.
      */
-    private void pruneBatches() {
-        if (startOffset == -1 || combinedBatchList.isEmpty()) {
-            return;
-        }
-        List<PersisterStateBatch> retainedBatches = new 
ArrayList<>(combinedBatchList.size());
-        combinedBatchList.forEach(batch -> {
-            if (batch.lastOffset() < startOffset) {
-                // batch is expired, skip current iteration
-                // -------
-                //         | -> start offset
-                return;
-            }
-
-            if (batch.firstOffset() >= startOffset) {
-                // complete batch is valid
-                //    ---------
-                //  | -> start offset
-                retainedBatches.add(batch);
-            } else {
-                // start offset intersects batch
-                //   ---------
-                //       |     -> start offset
-                retainedBatches.add(new PersisterStateBatch(startOffset, 
batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()));
+    private List<PersisterStateBatch> sweepMerge(List<PersisterStateBatch> 
batches) {
+        int n = batches.size();
+        Event[] events = new Event[n * 2];
+        for (int i = 0; i < n; i++) {
+            PersisterStateBatch b = batches.get(i);
+            BatchPriority priority = BatchPriority.from(b);
+            events[i * 2] = new Event(b.firstOffset(), true, priority);
+            events[i * 2 + 1] = new Event(b.lastOffset() + 1, false, priority);
+        }
+        // END (isBegin=false) sorts before BEGIN (isBegin=true) at the same 
offset so that
+        // contiguous same-state ranges meeting at offset X collapse to a 
single emit.
+        java.util.Arrays.sort(events, (e1, e2) -> {

Review Comment:
    smaller offset first and If two events have the same offset, END goes 
before BEGIN because:
          false = END
          true = BEGIN
         Boolean.compare(false, true) puts END first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to