dcapwell commented on code in PR #6:
URL: https://github.com/apache/cassandra-accord/pull/6#discussion_r944881568


##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -0,0 +1,1095 @@
+package accord.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import accord.utils.ArrayBuffers;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.utils.SortedArrays;
+import com.carrotsearch.hppc.ObjectIntHashMap;
+import com.google.common.base.Preconditions;
+
+import static accord.utils.ArrayBuffers.*;
+import static accord.utils.SortedArrays.*;
+
+// TODO (now): switch to RoutingKey
+public class Deps implements Iterable<Map.Entry<Key, TxnId>>
+{
+    private static final boolean DEBUG_CHECKS = true;
+
+    private static final TxnId[] NO_TXNIDS = new TxnId[0];
+    private static final int[] NO_INTS = new int[0];
+    public static final Deps NONE = new Deps(Keys.EMPTY, NO_TXNIDS, NO_INTS);
+
+    public static Collector<Command, Builder, Builder> collector(Keys keys)
+    {
+        return Collector.of(() -> Deps.builder(keys), Deps.Builder::add, (a, 
b) -> { throw new IllegalStateException(); });
+    }
+
+    public static class Builder
+    {
+        final Keys keys;
+        final ObjectIntHashMap<TxnId> txnIdLookup = new ObjectIntHashMap<>();
+        TxnId[] txnIds = new TxnId[4];
+        // Key -> [TxnId]
+        final int[][] keysToTxnId;
+        // Key -> Counter
+        final int[] keysToTxnIdCounts;
+
+        public Builder(Keys keys)
+        {
+            this.keys = keys;
+            this.keysToTxnId = new int[keys.size()][4];
+            this.keysToTxnIdCounts = new int[keys.size()];
+        }
+
+        public boolean isEmpty()
+        {
+            return txnIdLookup.isEmpty();
+        }
+
+        public void add(Command command)
+        {
+            int idx = ensureTxnIdx(command.txnId());
+
+            long count = keys.foldlIntersect(command.txn().keys, (li, ri, k, 
p, v) -> {
+                if (keysToTxnId[li].length == keysToTxnIdCounts[li])
+                    keysToTxnId[li] = Arrays.copyOf(keysToTxnId[li], 
keysToTxnId[li].length * 2);
+                keysToTxnId[li][keysToTxnIdCounts[li]++] = idx;
+                return v + 1;
+            }, 0, 0, -1);
+
+            if (count == 0)
+                throw new IllegalArgumentException(command + " does not 
intersect " + keys);
+        }
+
+        public void add(Key key, TxnId txnId)
+        {
+            int txnIdx = ensureTxnIdx(txnId);
+            int keyIdx = keys.indexOf(key);
+            if (keysToTxnIdCounts[keyIdx] == keysToTxnId[keyIdx].length)
+                keysToTxnId[keyIdx] = Arrays.copyOf(keysToTxnId[keyIdx], 
keysToTxnIdCounts[keyIdx] * 2);
+            keysToTxnId[keyIdx][keysToTxnIdCounts[keyIdx]++] = txnIdx;
+        }
+
+        public boolean contains(TxnId txnId)
+        {
+            return txnIdLookup.containsKey(txnId);
+        }
+
+        private int ensureTxnIdx(TxnId txnId)
+        {
+            int i = txnIdLookup.getOrDefault(txnId, -1);
+            if (i < 0)
+            {
+                txnIdLookup.put(txnId, i = txnIdLookup.size());
+                if (i == txnIds.length)
+                    txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
+            }
+            return i;
+        }
+
+        public Deps build()
+        {
+            TxnId[] txnIds = txnIdLookup.keys().toArray(TxnId.class);
+            Arrays.sort(txnIds, TxnId::compareTo);
+            // all data is indexed based off insertion order, but now that 
build is happening its desirable to have ids
+            // in tx order, in order to map from sorted -> insert order, need 
"txnIdMap" as the bridge
+            int[] txnIdMap = new int[txnIds.length];
+            for (int i = 0 ; i < txnIdMap.length ; i++)
+                txnIdMap[txnIdLookup.get(txnIds[i])] = i;
+
+            int keyCount = 0;
+            // Key -> distinct([*TxnID]])
+            // at the key position is the OFFSET into the array where the 
txnIds are for the key
+            // Example
+            // OFFSET(3), OFFSET(5), *tx1, *tx1, *tx2
+            int[] result; {
+                int count = 0;
+                for (int i = 0 ; i < keys.size() ; ++i)
+                {
+                    keyCount += keysToTxnIdCounts[i] > 0 ? 1 : 0;
+                    count += keysToTxnIdCounts[i];
+                }
+                result = new int[count + keyCount];
+            }
+
+            int keyIndex = 0;
+            int offset = keyCount;
+            for (int i = 0 ; i < keys.size() ; ++i)
+            {
+                if (keysToTxnIdCounts[i] > 0)
+                {
+                    int count = keysToTxnIdCounts[i];
+                    int[] src = keysToTxnId[i];
+                    // store all txnIds to results at offset
+                    for (int j = 0 ; j < count ; ++j)
+                        result[j + offset] = txnIdMap[src[j]];
+                    Arrays.sort(result, offset, count + offset);
+                    // the same txnId may be present multiple times, attempt 
to detect this and push all dups to the end
+                    int dups = 0;
+                    for (int j = offset + 1 ; j < offset + count ; ++j)
+                    {
+                        if (result[j] == result[j - 1]) ++dups;
+                        // positions i == i - 1 but dups were found; replace 
the "first" dup with the current value, removing it
+                        else if (dups > 0) result[j - dups] = result[j];
+                    }
+                    result[keyIndex] = offset += count - dups;
+                    ++keyIndex;
+                }
+            }
+            if (offset < result.length)
+                result = Arrays.copyOf(result, offset);
+
+            Keys keys = this.keys;
+            if (keyCount < keys.size())
+            {
+                keyIndex = 0;
+                Key[] newKeys = new Key[keyCount];
+                for (int i = 0 ; i < keys.size() ; ++i)
+                {
+                    if (keysToTxnIdCounts[i] > 0)
+                        newKeys[keyIndex++] = keys.get(i);
+                }
+                keys = Keys.of(newKeys);
+            }
+
+            return new Deps(keys, txnIds, result);
+        }
+    }
+
+    public static Builder builder(Keys keys)
+    {
+        return new Builder(keys);
+    }
+
+    private static class LinearMerger extends ArrayBuffers.SavingManager 
implements DepsConstructor<Object>

Review Comment:
   I added a patch to let me disable the cache logic and ran benchmarks with 
and without buffers... what I see is right now we don't get a win... so I feel 
we should drop the whole ArrayBuffers class, the logic in LinearMerge seems 
enough to make sure we can reuse between `Deps`
   
   ```
   Benchmark[name=merge w/cache(size=15, keys=296, total=6994)] avg 487.11ms, 
allocated 6028 MiB
   Benchmark[name=merge wo/cache(size=15, keys=296, total=6994)] avg 482.43ms, 
allocated 6028 MiB
   
   Benchmark[name=merge w/cache(size=13, keys=355, total=5447)] avg 365.89ms, 
allocated 4530 MiB
   Benchmark[name=merge wo/cache(size=13, keys=355, total=5447)] avg 364ms, 
allocated 4530 MiB
   
   Benchmark[name=merge w/cache(size=5, keys=184, total=2435)] avg 45.33ms, 
allocated 1384 MiB
   Benchmark[name=merge wo/cache(size=5, keys=184, total=2435)] avg 45.23ms, 
allocated 1384 MiB
   
   Benchmark[name=merge w/cache(size=14, keys=335, total=5918)] avg 317.15ms, 
allocated 4006 MiB
   Benchmark[name=merge wo/cache(size=14, keys=335, total=5918)] avg 323.47ms, 
allocated 4006 MiB
   
   Benchmark[name=merge w/cache(size=18, keys=362, total=9583)] avg 679.28ms, 
allocated 12133 MiB
   Benchmark[name=merge wo/cache(size=18, keys=362, total=9583)] avg 668.39ms, 
allocated 12692 MiB
   
   Benchmark[name=merge w/cache(size=5, keys=237, total=3162)] avg 91.93ms, 
allocated 1731 MiB
   Benchmark[name=merge wo/cache(size=5, keys=237, total=3162)] avg 83.33ms, 
allocated 1731 MiB
   
   Benchmark[name=merge w/cache(size=3, keys=149, total=1662)] avg 20.64ms, 
allocated 807 MiB
   Benchmark[name=merge wo/cache(size=3, keys=149, total=1662)] avg 20.67ms, 
allocated 807 MiB
   ```
   
   sha c3dd834c008c1a98e71733a321c5e84ba9009269



##########
accord-core/src/main/java/accord/utils/SortedArrays.java:
##########
@@ -0,0 +1,628 @@
+package accord.utils;
+
+import java.util.Arrays;
+import java.util.function.IntFunction;
+
+import accord.utils.ArrayBuffers.BufferManager;
+import accord.utils.ArrayBuffers.IntBufferAllocator;
+import org.apache.cassandra.utils.concurrent.Inline;
+
+import javax.annotation.Nullable;
+
+import static accord.utils.ArrayBuffers.noCaching;
+
+// TODO (now): javadoc
+public class SortedArrays
+{
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, 
T[] right, IntFunction<T[]> allocator)
+    {
+        return linearUnion(left, right, allocator, noCaching());
+    }
+
+    /**
+     * Given two sorted arrays, return an array containing the elements 
present in either, preferentially returning one
+     * of the inputs if it contains all elements of the other.
+     *
+     * TODO: introduce exponential search optimised version
+     */
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, 
T[] right, IntFunction<T[]> allocator, BufferManager buffers)
+    {
+        return linearUnion(left, left.length, right, right.length, allocator, 
buffers);
+    }
+    
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, 
int leftLength, T[] right, int rightLength, IntFunction<T[]> allocator, 
BufferManager buffers)
+    {
+        int leftIdx = 0;
+        int rightIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        // first, pick the superset candidate and merge the two until we find 
the first missing item
+        // if none found, return the superset candidate
+        if (leftLength >= rightLength)
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp <= 0)
+                {
+                    leftIdx += 1;
+                    rightIdx += cmp == 0 ? 1 : 0;
+                }
+                else
+                {
+                    resultSize = leftIdx;
+                    result = buffers.get(allocator, resultSize + (leftLength - 
leftIdx) + (rightLength - (rightIdx - 1)));
+                    System.arraycopy(left, 0, result, 0, resultSize);
+                    result[resultSize++] = right[rightIdx++];
+                    break;
+                }
+            }
+
+            if (result == null)
+            {
+                if (rightIdx == rightLength) // all elements matched, so can 
return the other array
+                    return buffers.completeWithExisting(left, leftLength);
+                // no elements matched or only a subset matched
+                result = buffers.get(allocator, leftLength + (rightLength - 
rightIdx));
+                resultSize = leftIdx;
+                System.arraycopy(left, 0, result, 0, resultSize);
+            }
+        }
+        else
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp >= 0)
+                {
+                    rightIdx += 1;
+                    leftIdx += cmp == 0 ? 1 : 0;
+                }
+                else
+                {
+                    resultSize = rightIdx;
+                    result = buffers.get(allocator, resultSize + (leftLength - 
(leftIdx - 1)) + (rightLength - rightIdx));
+                    System.arraycopy(right, 0, result, 0, resultSize);
+                    result[resultSize++] = left[leftIdx++];
+                    break;
+                }
+            }
+
+            if (result == null)
+            {
+                if (leftIdx == leftLength) // all elements matched, so can 
return the other array
+                    return buffers.completeWithExisting(right, rightLength);
+                // no elements matched or only a subset matched
+                result = buffers.get(allocator, rightLength + (leftLength - 
leftIdx));
+                resultSize = rightIdx;
+                System.arraycopy(right, 0, result, 0, resultSize);
+            }
+        }
+
+        try
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                T minKey;
+                if (cmp == 0)
+                {
+                    leftIdx++;
+                    rightIdx++;
+                    minKey = leftKey;
+                }
+                else if (cmp < 0)
+                {
+                    leftIdx++;
+                    minKey = leftKey;
+                }
+                else
+                {
+                    rightIdx++;
+                    minKey = rightKey;
+                }
+                result[resultSize++] = minKey;
+            }
+
+            while (leftIdx < leftLength)
+                result[resultSize++] = left[leftIdx++];
+
+            while (rightIdx < rightLength)
+                result[resultSize++] = right[rightIdx++];
+
+            return buffers.complete(allocator, result, resultSize);
+        }
+        finally
+        {
+            buffers.discard(result);
+        }
+    }
+
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] 
left, T[] right, IntFunction<T[]> allocator)
+    {
+        return linearIntersection(left, right, allocator, noCaching());
+    }
+
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] 
left, T[] right, IntFunction<T[]> allocator, BufferManager buffers)
+    {
+        return linearIntersection(left, left.length, right, right.length, 
allocator, buffers);
+    }
+
+    /**
+     * Given two sorted arrays, return the elements present only in both, 
preferentially returning one of the inputs if
+     * it contains no elements not present in the other.
+     *
+     * TODO: introduce exponential search optimised version
+     */
+    @SuppressWarnings("unused") // was used until recently, might be used 
again?
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] 
left, int leftLength, T[] right, int rightLength, IntFunction<T[]> allocator, 
BufferManager buffers)
+    {
+        int leftIdx = 0;
+        int rightIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        // first pick a subset candidate, and merge both until we encounter an 
element not present in the other array
+        if (leftLength <= rightLength)
+        {
+            boolean hasMatch = false;
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp >= 0)
+                {
+                    rightIdx += 1;
+                    leftIdx += cmp == 0 ? 1 : 0;
+                    if (cmp == 0)
+                        hasMatch = true;
+                }
+                else
+                {
+                    resultSize = leftIdx++;
+                    result = buffers.get(allocator, resultSize + 
Math.min(leftLength - leftIdx, rightLength - rightIdx));
+                    System.arraycopy(left, 0, result, 0, resultSize);
+                    break;
+                }
+            }
+
+            if (result == null)
+                return hasMatch ? buffers.completeWithExisting(left, 
leftLength) : buffers.complete(allocator, allocator.apply(0), 0);
+        }
+        else
+        {
+            boolean hasMatch = false;
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp <= 0)
+                {
+                    leftIdx += 1;
+                    rightIdx += cmp == 0 ? 1 : 0;
+                    if (cmp == 0)
+                        hasMatch = true;
+                }
+                else
+                {
+                    resultSize = rightIdx++;
+                    result = buffers.get(allocator, resultSize + 
Math.min(leftLength - leftIdx, rightLength - rightIdx));
+                    System.arraycopy(right, 0, result, 0, resultSize);
+                    break;
+                }
+            }
+
+            if (result == null)
+                return hasMatch ? buffers.completeWithExisting(right, 
rightLength) : buffers.complete(allocator, allocator.apply(0), 0);
+        }
+
+        try
+        {
+
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp == 0)
+                {
+                    leftIdx++;
+                    rightIdx++;
+                    result[resultSize++] = leftKey;
+                }
+                else if (cmp < 0) leftIdx++;
+                else rightIdx++;
+            }
+
+            return buffers.complete(allocator, result, resultSize);
+        }
+        finally
+        {
+            buffers.discard(result);
+        }
+    }
+
+    /**
+     * Given two sorted arrays, return the elements present only in the first, 
preferentially returning the first array
+     * itself if possible
+     */
+    @SuppressWarnings("unused") // was used until recently, might be used 
again?
+    public static <T extends Comparable<? super T>> T[] linearDifference(T[] 
left, T[] right, IntFunction<T[]> allocate)
+    {
+        int rightIdx = 0;
+        int leftIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        while (leftIdx < left.length && rightIdx < right.length)
+        {
+            T leftKey = left[leftIdx];
+            T rightKey = right[rightIdx];
+            int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+            if (cmp == 0)
+            {
+                resultSize = leftIdx++;
+                ++rightIdx;
+                result = allocate.apply(resultSize + left.length - leftIdx);
+                System.arraycopy(left, 0, result, 0, resultSize);
+                break;
+            }
+            else if (cmp < 0)
+            {
+                ++leftIdx;
+            }
+            else
+            {
+                ++rightIdx;
+            }
+        }
+
+        if (result == null)
+            return left;
+
+        while (leftIdx < left.length && rightIdx < right.length)
+        {
+            T leftKey = left[leftIdx];
+            T rightKey = right[rightIdx];
+            int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+            if (cmp > 0)
+            {
+                result[resultSize++] = left[leftIdx++];
+            }
+            else if (cmp < 0)
+            {
+                ++rightIdx;
+            }
+            else
+            {
+                ++leftIdx;
+                ++rightIdx;
+            }
+        }
+        while (leftIdx < left.length)
+            result[resultSize++] = left[leftIdx++];
+
+        if (resultSize < result.length)
+            result = Arrays.copyOf(result, resultSize);
+
+        return result;
+    }
+
+    public static <A, R> A[] sliceWithMultipleMatches(A[] slice, R[] select, 
IntFunction<A[]> factory, AsymmetricComparator<A, R> cmp1, 
AsymmetricComparator<R, A> cmp2)
+    {
+        A[] result;
+        int resultCount;
+        int ai = 0, ri = 0;
+        while (true)
+        {
+            long ari = findNextIntersection(slice, ai, select, ri, cmp1, cmp2, 
Search.CEIL);
+            if (ari < 0)
+            {
+                if (ai == slice.length)
+                    return slice; // all elements of slice were found in 
select, so can return the array unchanged
+
+                // The first (ai - 1) elements are present (without a gap), so 
copy just that subset
+                return Arrays.copyOf(slice, ai);
+            }
+
+            int nextai = (int)(ari >>> 32);
+            if (ai != nextai)
+            {
+                // A gap is detected in slice!
+                // When ai == nextai we "consume" it and move to the last 
instance of slice[ai], then choose the next element,
+                // this means that ai currently points to an element in slice 
where it is not known if its present in select,
+                // so != implies a gap is detected!
+                resultCount = ai;
+                result = factory.apply(ai + (slice.length - nextai));
+                System.arraycopy(slice, 0, result, 0, resultCount);
+                ai = nextai;
+                ri = (int)ari;
+                break;
+            }
+
+            ri = (int)ari;
+            // In cases where duplicates are present in slice, find the last 
instance of slice[ai], and move past it.
+            // slice[ai] is known to be present, so need to check the next 
element.
+            ai = exponentialSearch(slice, nextai, slice.length, select[ri], 
cmp2, Search.FLOOR) + 1;
+        }
+
+        while (true)
+        {
+            // Find the next element after the last element matching 
select[ri] and copy from slice into result
+            // nextai may be negative (such as -1), so the +1 may keep it 
negative OR set 0, since 0 < 0 is false
+            // it is safe to avoid checking for negative values
+            int nextai = exponentialSearch(slice, ai, slice.length, 
select[ri], cmp2, Search.FLOOR) + 1;
+            while (ai < nextai)
+                result[resultCount++] = slice[ai++];
+
+            long ari = findNextIntersection(slice, ai, select, ri, cmp1, cmp2, 
Search.CEIL);
+            if (ari < 0)
+            {
+                if (resultCount < result.length)
+                    result = Arrays.copyOf(result, resultCount);
+
+                return result;
+            }
+
+            ai = (int)(ari >>> 32);
+            ri = (int)ari;
+        }
+    }
+
+    /**
+     * Copy-on-write insert into the provided array; returns the same array if 
item already present, or a new array
+     * with the item in the correct position if not. Linear time complexity.
+     */
+    public static <T extends Comparable<? super T>> T[] insert(T[] src, T 
item, IntFunction<T[]> factory)
+    {
+        int insertPos = Arrays.binarySearch(src, item);
+        if (insertPos >= 0)
+            return src;
+        insertPos = -1 - insertPos;
+
+        T[] trg = factory.apply(src.length + 1);
+        System.arraycopy(src, 0, trg, 0, insertPos);
+        trg[insertPos] = item;
+        System.arraycopy(src, insertPos, trg, insertPos + 1, src.length - 
insertPos);
+        return trg;
+    }
+
+    /**
+     * Equivalent to {@link Arrays#binarySearch}, only more efficient 
algorithmically for linear merges.
+     * Binary search has worst case complexity {@code O(n.lg n)} for a linear 
merge, whereas exponential search
+     * has a worst case of {@code O(n)}. However compared to a simple linear 
merge, the best case for exponential
+     * search is {@code O(lg(n))} instead of {@code O(n)}.
+     */
+    public static <T1, T2 extends Comparable<? super T1>> int 
exponentialSearch(T1[] in, int from, int to, T2 find)
+    {
+        return exponentialSearch(in, from, to, find, Comparable::compareTo, 
Search.FAST);
+    }
+
+    // TODO: check inlining elides this
+    public enum Search { FLOOR, CEIL, FAST }
+
+    @Inline
+    public static <T1, T2> int exponentialSearch(T2[] in, int from, int to, T1 
find, AsymmetricComparator<T1, T2> comparator, Search op)
+    {
+        int step = 0;
+        loop: while (from + step < to)
+        {
+            int i = from + step;
+            int c = comparator.compare(find, in[i]);
+            if (c < 0)
+            {
+                to = i;
+                break;
+            }
+            if (c > 0)
+            {
+                from = i + 1;
+            }
+            else
+            {
+                switch (op)
+                {
+                    case FAST:
+                        return i;
+
+                    case CEIL:
+                        if (step == 0)
+                            return from;
+                        to = i + 1; // could in theory avoid one extra 
comparison in this case, but would uglify things
+                        break loop;
+
+                    case FLOOR:
+                        from = i;
+                }
+            }
+            step = step * 2 + 1; // jump in perfect binary search increments
+        }
+        return binarySearch(in, from, to, find, comparator, op);
+    }
+
+    @Inline
+    public static int exponentialSearch(int[] in, int from, int to, int find)
+    {
+        int step = 0;
+        while (from + step < to)
+        {
+            int i = from + step;
+            int c = Integer.compare(find, in[i]);
+            if (c < 0)
+            {
+                to = i;
+                break;
+            }
+            if (c > 0)
+            {
+                from = i + 1;
+            }
+            else
+            {
+                return i;
+            }
+            step = step * 2 + 1; // jump in perfect binary search increments
+        }
+        return Arrays.binarySearch(in, from, to, find);
+    }
+
+    @Inline
+    public static <T1, T2> int binarySearch(T2[] in, int from, int to, T1 
find, AsymmetricComparator<T1, T2> comparator, Search op)
+    {
+        int found = -1;
+        while (from < to)
+        {
+            int i = (from + to) >>> 1;
+            int c = comparator.compare(find, in[i]);
+            if (c < 0)
+            {
+                to = i;
+            }
+            else if (c > 0)
+            {
+                from = i + 1;
+            }
+            else
+            {
+                switch (op)
+                {
+                    default: throw new IllegalStateException();
+                    case FAST:
+                        return i;
+
+                    case CEIL:
+                        to = found = i;
+                        break;
+
+                    case FLOOR:
+                        found = i;
+                        from = i + 1;
+                }
+            }
+        }
+        return found >= 0 ? found : -1 - to;
+    }
+
+    public static <T1, T2 extends Comparable<T1>> long 
findNextIntersectionWithOverlaps(T1[] as, int ai, T2[] bs, int bi)
+    {
+        return findNextIntersectionWithOverlaps(as, ai, bs, bi, (a, b) -> 
-b.compareTo(a), Comparable::compareTo);
+    }
+
+    public static <T1, T2> long findNextIntersectionWithOverlaps(T1[] as, int 
ai, T2[] bs, int bi, AsymmetricComparator<T1, T2> cmp1, 
AsymmetricComparator<T2, T1> cmp2)
+    {
+        return findNextIntersection(as, ai, bs, bi, cmp1, cmp2, Search.CEIL);
+    }
+
+    public static <T extends Comparable<? super T>> long 
findNextIntersection(T[] as, int ai, T[] bs, int bi)
+    {
+        return findNextIntersection(as, ai, bs, bi, Comparable::compareTo);
+    }
+
+    public static <T> long findNextIntersection(T[] as, int ai, T[] bs, int 
bi, AsymmetricComparator<T, T> comparator)
+    {
+        return findNextIntersection(as, ai, bs, bi, comparator, comparator, 
Search.FAST);
+    }
+
+    /**
+     * Given two sorted arrays, find the next index in each array containing 
an equal item.
+     *
+     * Works with CEIL or FAST; FAST to be used if precisely one match for 
each item in either list, CEIL if one item
+     *
+     * in either list may be matched to multiple in the other list.
+     */
+    private static <T1, T2> long findNextIntersection(T1[] as, int ai, T2[] 
bs, int bi, AsymmetricComparator<T1, T2> cmp1, AsymmetricComparator<T2, T1> 
cmp2, Search op)
+    {
+        if (ai == as.length)
+            return -1;
+
+        while (true)
+        {
+            bi = SortedArrays.exponentialSearch(bs, bi, bs.length, as[ai], 
cmp1, op);
+            if (bi >= 0)
+                break;
+
+            bi = -1 - bi;
+            if (bi == bs.length)
+                return -1;
+
+            ai = SortedArrays.exponentialSearch(as, ai, as.length, bs[bi], 
cmp2, op);
+            if (ai >= 0)
+                break;
+
+            ai = -1 - ai;
+            if (ai == as.length)
+                return -1;
+        }
+        return ((long)ai << 32) | bi;
+    }
+
+    public static <T extends Comparable<? super T>> int[] remapToSuperset(T[] 
src, T[] trg)
+    {
+        return remapToSuperset(src, trg, int[]::new);
+    }
+
+    public static <T extends Comparable<? super T>> int[] remapToSuperset(T[] 
src, T[] trg, IntBufferAllocator allocator)
+    {
+        return remapToSuperset(src, src.length, trg, trg.length, allocator);
+    }
+
+    /**
+     * Given two portions of sorted arrays with unique elements, where the one 
is a subset of the other,
+     * return an int[] with its initial {@code srcLength} elements populated, 
with the index within {@code trg}
+     * of the corresponding element within {@code src}, or -1 otherwise.
+     *
+     * That is, {@code result[i] == -1 || src[i].equals(trg[result[i]])}
+     */
+    @Nullable
+    public static <T extends Comparable<? super T>> int[] remapToSuperset(T[] 
src, int srcLength, T[] trg, int trgLength,
+                                                                          
IntBufferAllocator allocator)
+    {
+        if (src == trg || trgLength == srcLength)
+            return null;
+
+        int[] result = allocator.allocateInts(srcLength);
+
+        int i = 0, j = 0;
+        while (i < srcLength && j < trgLength)
+        {
+            if (src[i] != trg[j] && !src[i].equals(trg[j]))
+            {
+                j = SortedArrays.exponentialSearch(trg, j, trgLength, src[i]);
+                if (j < 0)
+                {
+                    if (i > 0 && src[i] == src[i-1])
+                        throw new AssertionError("Unexpected value in source: 
" + src[i] + " at index " + i + " duplicates index " + (i - 1));
+                    throw new AssertionError("Unexpected value in source: " + 
src[i] + " at index " + i + " does not exist in target array");
+                }
+            }
+            result[i++] = j++;
+        }
+        Arrays.fill(result, i, srcLength, -1);

Review Comment:
   Removed -1 in 4e2c60c219f34e261cea1abe256775f67bb205c2



##########
accord-core/src/main/java/accord/primitives/KeyRanges.java:
##########
@@ -0,0 +1,457 @@
+package accord.primitives;
+
+import accord.api.Key;
+import accord.utils.SortedArrays;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+import java.util.*;
+
+public class KeyRanges implements Iterable<KeyRange>
+{
+    public static final KeyRanges EMPTY = ofSortedAndDeoverlappedUnchecked(new 
KeyRange[0]);
+
+    // TODO: fix raw parameterized use
+    final KeyRange[] ranges;
+
+    private KeyRanges(KeyRange[] ranges)
+    {
+        Preconditions.checkNotNull(ranges);
+        this.ranges = ranges;
+    }
+
+    public KeyRanges(List<KeyRange> ranges)
+    {
+        this(ranges.toArray(KeyRange[]::new));
+    }
+
+    @Override
+    public String toString()
+    {
+        return Arrays.toString(ranges);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        KeyRanges ranges1 = (KeyRanges) o;
+        return Arrays.equals(ranges, ranges1.ranges);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.hashCode(ranges);
+    }
+
+    @Override
+    public Iterator<KeyRange> iterator()
+    {
+        return Iterators.forArray(ranges);
+    }
+
+    public int rangeIndexForKey(int lowerBound, int upperBound, Key key)
+    {
+        return Arrays.binarySearch(ranges, lowerBound, upperBound, key,
+                                   (r, k) -> -((KeyRange) r).compareKey((Key) 
k));
+    }
+
+    public int rangeIndexForKey(Key key)
+    {
+        return rangeIndexForKey(0, ranges.length, key);
+    }
+
+    public boolean contains(Key key)
+    {
+        return rangeIndexForKey(key) >= 0;
+    }
+
+    public boolean containsAll(Keys keys)
+    {
+        return keys.rangeFoldl(this, (from, to, p, v) -> v + (to - from), 0, 
0, 0) == keys.size();
+    }
+
+    public int size()
+    {
+        return ranges.length;
+    }
+
+    public KeyRange get(int i)
+    {
+        return ranges[i];
+    }
+
+    public boolean isEmpty()
+    {
+        return size() == 0;
+    }
+
+    public KeyRanges select(int[] indexes)
+    {
+        KeyRange[] selection = new KeyRange[indexes.length];
+        for (int i=0; i<indexes.length; i++)
+            selection[i] = ranges[indexes[i]];
+        return new KeyRanges(selection);
+    }
+
+    public boolean intersects(Keys keys)
+    {
+        return findNextIntersection(0, keys, 0) >= 0;
+    }
+
+    public boolean intersects(KeyRanges that)
+    {
+        return SortedArrays.findNextIntersection(this.ranges, 0, that.ranges, 
0, KeyRange::compareIntersecting) >= 0;
+    }
+
+    public int findFirstKey(Keys keys)
+    {
+        return findNextKey(0, keys, 0);
+    }
+
+    public int findNextKey(int ri, Keys keys, int ki)
+    {
+        return (int) (findNextIntersection(ri, keys, ki) >> 32);
+    }
+
+    // returns ki in top 32 bits, ri in bottom, or -1 if no match found
+    public long findNextIntersection(int ri, Keys keys, int ki)
+    {
+        return SortedArrays.findNextIntersectionWithOverlaps(keys.keys, ki, 
ranges, ri);
+    }
+
+    public int findFirstKey(Key[] keys)
+    {
+        return findNextKey(0, keys, 0);
+    }
+
+    public int findNextKey(int ri, Key[] keys, int ki)
+    {
+        return (int) (findNextIntersection(ri, keys, ki) >> 32);
+    }
+
+    // returns ki in top 32 bits, ri in bottom, or -1 if no match found
+    public long findNextIntersection(int ri, Key[] keys, int ki)
+    {
+        return SortedArrays.findNextIntersectionWithOverlaps(keys, ki, ranges, 
ri);
+    }
+
+    /**
+     * Subtracts the given set of key ranges from this
+     * @param that
+     * @return
+     */
+    public KeyRanges difference(KeyRanges that)
+    {
+        if (that == this)
+            return KeyRanges.EMPTY;
+
+        List<KeyRange> result = new ArrayList<>(this.size() + that.size());
+        int thatIdx = 0;
+
+        for (int thisIdx=0; thisIdx<this.size(); thisIdx++)
+        {
+            KeyRange thisRange = this.ranges[thisIdx];
+            while (thatIdx < that.size())
+            {
+                KeyRange thatRange = that.ranges[thatIdx];
+
+                int cmp = thisRange.compareIntersecting(thatRange);
+                if (cmp > 0)
+                {
+                    thatIdx++;
+                    continue;
+                }
+                if (cmp < 0) break;
+
+                int scmp = thisRange.start().compareTo(thatRange.start());
+                int ecmp = thisRange.end().compareTo(thatRange.end());
+
+                if (scmp < 0)
+                    result.add(thisRange.subRange(thisRange.start(), 
thatRange.start()));
+
+                if (ecmp <= 0)
+                {
+                    thisRange = null;
+                    break;
+                }
+                else
+                {
+                    thisRange = thisRange.subRange(thatRange.end(), 
thisRange.end());
+                    thatIdx++;
+                }
+            }
+            if (thisRange != null)
+                result.add(thisRange);
+        }
+        return new KeyRanges(result.toArray(KeyRange[]::new));
+    }
+
+    /**
+     * attempts a linear merge where {@code as} is expected to be a superset 
of {@code bs},
+     * terminating at the first indexes where this ceases to be true
+     * @return index of {@code as} in upper 32bits, {@code bs} in lower 32bits
+     *
+     * TODO: better support for merging runs of overlapping or adjacent ranges
+     */
+    private static long supersetLinearMerge(KeyRange[] as, KeyRange[] bs)
+    {
+        int ai = 0, bi = 0;
+        out: while (ai < as.length && bi < bs.length)
+        {
+            KeyRange a = as[ai];
+            KeyRange b = bs[bi];
+
+            int c = a.compareIntersecting(b);
+            if (c < 0)
+            {
+                ai++;
+            }
+            else if (c > 0)
+            {
+                break;
+            }
+            else if (b.start().compareTo(a.start()) < 0)
+            {
+                break;
+            }
+            else if ((c = b.end().compareTo(a.end())) <= 0)
+            {
+                bi++;
+                if (c == 0) ai++;
+            }
+            else
+            {
+                // use a temporary counter, so that if we don't find a run of 
ranges that enforce the superset
+                // condition we exit at the start of the mismatch run (and 
permit it to be merged)
+                // TODO: use exponentialSearch
+                int tmpai = ai;
+                do
+                {
+                    if (++tmpai == as.length || 
!a.end().equals(as[tmpai].start()))
+                        break out;
+                    a = as[tmpai];
+                }
+                while (a.end().compareTo(b.end()) < 0);
+                bi++;
+                ai = tmpai;
+            }
+        }
+
+        return ((long)ai << 32) | bi;
+    }
+
+    /**
+     * @return true iff {@code that} is a subset of {@code this}
+     */
+    public boolean contains(KeyRanges that)
+    {
+        if (this.isEmpty()) return that.isEmpty();
+        if (that.isEmpty()) return true;
+
+        return ((int) supersetLinearMerge(this.ranges, that.ranges)) == 
that.size();
+    }
+
+    /**
+     * @return the union of {@code this} and {@code that}, returning one of 
the two inputs if possible
+     */
+    public KeyRanges union(KeyRanges that)
+    {
+        if (this == that) return this;
+        if (this.isEmpty()) return that;
+        if (that.isEmpty()) return this;
+
+        KeyRange[] as = this.ranges, bs = that.ranges;
+        {
+            // make sure as/ai represent the ranges that might fully contain 
the other
+            int c = as[0].start().compareTo(bs[0].start());
+            if (c > 0 || c == 0 && as[as.length - 
1].end().compareTo(bs[bs.length - 1].end()) < 0)
+            {
+                KeyRange[] tmp = as; as = bs; bs = tmp;
+            }
+        }
+
+        int ai, bi; {
+            long tmp = supersetLinearMerge(as, bs);
+            ai = (int)(tmp >>> 32);
+            bi = (int)tmp;
+        }
+
+        if (bi == bs.length)
+            return as == this.ranges ? this : that;
+
+        KeyRange[] result = new KeyRange[as.length + (bs.length - bi)];
+        int resultCount = copyAndMergeTouching(as, 0, result, 0, ai);
+
+        while (ai < as.length && bi < bs.length)
+        {
+            KeyRange a = as[ai];
+            KeyRange b = bs[bi];
+
+            int c = a.compareIntersecting(b);
+            if (c < 0)
+            {
+                result[resultCount++] = a;
+                ai++;
+            }
+            else if (c > 0)
+            {
+                result[resultCount++] = b;
+                bi++;
+            }
+            else
+            {
+                Key start = a.start().compareTo(b.start()) <= 0 ? a.start() : 
b.start();
+                Key end = a.end().compareTo(b.end()) >= 0 ? a.end() : b.end();
+                ai++;
+                bi++;
+                while (ai < as.length || bi < bs.length)
+                {
+                    KeyRange min;
+                    if (ai == as.length) min = bs[bi];
+                    else if (bi == bs.length) min = a = as[ai];
+                    else min = as[ai].start().compareTo(bs[bi].start()) < 0 ? 
a = as[ai] : bs[bi];
+                    if (min.start().compareTo(end) > 0)
+                        break;
+                    if (min.end().compareTo(end) > 0)
+                        end = min.end();
+                    if (a == min) ai++;
+                    else bi++;
+                }
+                result[resultCount++] = a.subRange(start, end);
+            }
+        }
+
+        while (ai < as.length)
+            result[resultCount++] = as[ai++];
+
+        while (bi < bs.length)
+            result[resultCount++] = bs[bi++];
+
+        if (resultCount < result.length)
+            result = Arrays.copyOf(result, resultCount);
+
+        return new KeyRanges(result);
+    }
+
+    public KeyRanges mergeTouching()
+    {
+        if (ranges.length == 0)
+            return this;
+
+        KeyRange[] result = new KeyRange[ranges.length];
+        int count = copyAndMergeTouching(ranges, 0, result, 0, ranges.length);
+        if (count == result.length)
+            return this;
+        result = Arrays.copyOf(result, count);
+        return new KeyRanges(result);
+    }
+
+    private static int copyAndMergeTouching(KeyRange[] src, int srcPosition, 
KeyRange[] trg, int trgPosition, int srcCount)
+    {
+        if (srcCount == 0)
+            return 0;
+
+        int count = 0;
+        KeyRange prev = src[srcPosition];
+        Key end = prev.end();
+        for (int i = 1 ; i < srcCount ; ++i)
+        {
+            KeyRange next = src[srcPosition + i];
+            if (end.equals(next.start()))
+            {
+                end = next.end();
+            }
+            else
+            {
+                trg[trgPosition + count++] = maybeUpdateEnd(prev, end);
+                prev = next;
+                end = next.end();
+            }
+        }
+        trg[trgPosition + count++] = maybeUpdateEnd(prev, end);
+        return count;
+    }
+
+    private static KeyRange maybeUpdateEnd(KeyRange range, Key withEnd)
+    {
+        return withEnd == range.end() ? range : range.subRange(range.start(), 
withEnd);
+    }
+
+    public static KeyRanges of(KeyRange ... ranges)
+    {
+        if (ranges.length == 0)
+            return EMPTY;
+
+        return sortAndDeoverlap(ranges, ranges.length);
+    }
+
+    private static KeyRanges sortAndDeoverlap(KeyRange[] ranges, int count)
+    {
+        if (count == 0)
+            return EMPTY;
+
+        if (count == 1)
+        {
+            if (ranges.length == 1)
+                return new KeyRanges(ranges);
+
+            return new KeyRanges(Arrays.copyOf(ranges, count));
+        }
+
+        Arrays.sort(ranges, 0, count, Comparator.comparing(KeyRange::start));
+        KeyRange prev = ranges[0];
+        int removed = 0;
+        for (int i = 1 ; i < count ; ++i)
+        {
+            KeyRange next = ranges[i];
+            if (prev.end().compareTo(next.start()) > 0)
+            {
+                prev = prev.subRange(prev.start(), next.start());
+                if (prev.end().compareTo(next.end()) >= 0)
+                {
+                    removed++;
+                }
+                else if (removed > 0)
+                {
+                    ranges[i - removed] = prev = next.subRange(prev.end(), 
next.end());
+                }
+            }
+            else if (removed > 0)
+            {
+                ranges[i - removed] = prev = next;
+            }
+        }
+
+        count -= removed;
+        if (count != ranges.length)
+            ranges = Arrays.copyOf(ranges, count);
+
+        return new KeyRanges(ranges);
+    }
+
+    public static KeyRanges ofSortedAndDeoverlapped(KeyRange ... ranges)
+    {
+        for (int i = 1 ; i < ranges.length ; ++i)
+        {
+            if (ranges[i - 1].end().compareTo(ranges[i].start()) > 0)

Review Comment:
   ```
   [0, 10],
   [10, 20]
   ```
   
   when both `[I - 1].end` and `[I].start` are inclusive then there is an 
overlap



##########
accord-core/src/test/java/accord/utils/SortedArraysTest.java:
##########
@@ -0,0 +1,335 @@
+package accord.utils;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static accord.utils.Property.qt;
+
+class SortedArraysTest
+{
+//    @Test
+//    public void testRemapper()

Review Comment:
   adding this test back 521431ca9150816e70c42558b70bfc4006d65419



##########
accord-core/src/main/java/accord/utils/SortedArrays.java:
##########
@@ -0,0 +1,628 @@
+package accord.utils;
+
+import java.util.Arrays;
+import java.util.function.IntFunction;
+
+import accord.utils.ArrayBuffers.BufferManager;
+import accord.utils.ArrayBuffers.IntBufferAllocator;
+import org.apache.cassandra.utils.concurrent.Inline;
+
+import javax.annotation.Nullable;
+
+import static accord.utils.ArrayBuffers.noCaching;
+
+// TODO (now): javadoc
+public class SortedArrays
+{
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, 
T[] right, IntFunction<T[]> allocator)
+    {
+        return linearUnion(left, right, allocator, noCaching());
+    }
+
+    /**
+     * Given two sorted arrays, return an array containing the elements 
present in either, preferentially returning one
+     * of the inputs if it contains all elements of the other.
+     *
+     * TODO: introduce exponential search optimised version
+     */
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, 
T[] right, IntFunction<T[]> allocator, BufferManager buffers)
+    {
+        return linearUnion(left, left.length, right, right.length, allocator, 
buffers);
+    }
+    
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, 
int leftLength, T[] right, int rightLength, IntFunction<T[]> allocator, 
BufferManager buffers)
+    {
+        int leftIdx = 0;
+        int rightIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        // first, pick the superset candidate and merge the two until we find 
the first missing item
+        // if none found, return the superset candidate
+        if (leftLength >= rightLength)
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp <= 0)
+                {
+                    leftIdx += 1;
+                    rightIdx += cmp == 0 ? 1 : 0;
+                }
+                else
+                {
+                    resultSize = leftIdx;
+                    result = buffers.get(allocator, resultSize + (leftLength - 
leftIdx) + (rightLength - (rightIdx - 1)));
+                    System.arraycopy(left, 0, result, 0, resultSize);
+                    result[resultSize++] = right[rightIdx++];
+                    break;
+                }
+            }
+
+            if (result == null)
+            {
+                if (rightIdx == rightLength) // all elements matched, so can 
return the other array
+                    return buffers.completeWithExisting(left, leftLength);
+                // no elements matched or only a subset matched
+                result = buffers.get(allocator, leftLength + (rightLength - 
rightIdx));
+                resultSize = leftIdx;
+                System.arraycopy(left, 0, result, 0, resultSize);
+            }
+        }
+        else
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp >= 0)
+                {
+                    rightIdx += 1;
+                    leftIdx += cmp == 0 ? 1 : 0;
+                }
+                else
+                {
+                    resultSize = rightIdx;
+                    result = buffers.get(allocator, resultSize + (leftLength - 
(leftIdx - 1)) + (rightLength - rightIdx));
+                    System.arraycopy(right, 0, result, 0, resultSize);
+                    result[resultSize++] = left[leftIdx++];
+                    break;
+                }
+            }
+
+            if (result == null)
+            {
+                if (leftIdx == leftLength) // all elements matched, so can 
return the other array
+                    return buffers.completeWithExisting(right, rightLength);
+                // no elements matched or only a subset matched
+                result = buffers.get(allocator, rightLength + (leftLength - 
leftIdx));
+                resultSize = rightIdx;
+                System.arraycopy(right, 0, result, 0, resultSize);
+            }
+        }
+
+        try
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                T minKey;
+                if (cmp == 0)
+                {
+                    leftIdx++;
+                    rightIdx++;
+                    minKey = leftKey;
+                }
+                else if (cmp < 0)
+                {
+                    leftIdx++;
+                    minKey = leftKey;
+                }
+                else
+                {
+                    rightIdx++;
+                    minKey = rightKey;
+                }
+                result[resultSize++] = minKey;
+            }
+
+            while (leftIdx < leftLength)
+                result[resultSize++] = left[leftIdx++];
+
+            while (rightIdx < rightLength)
+                result[resultSize++] = right[rightIdx++];
+
+            return buffers.complete(allocator, result, resultSize);
+        }
+        finally
+        {
+            buffers.discard(result);
+        }
+    }
+
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] 
left, T[] right, IntFunction<T[]> allocator)
+    {
+        return linearIntersection(left, right, allocator, noCaching());
+    }
+
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] 
left, T[] right, IntFunction<T[]> allocator, BufferManager buffers)
+    {
+        return linearIntersection(left, left.length, right, right.length, 
allocator, buffers);
+    }
+
+    /**
+     * Given two sorted arrays, return the elements present only in both, 
preferentially returning one of the inputs if
+     * it contains no elements not present in the other.
+     *
+     * TODO: introduce exponential search optimised version
+     */
+    @SuppressWarnings("unused") // was used until recently, might be used 
again?
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] 
left, int leftLength, T[] right, int rightLength, IntFunction<T[]> allocator, 
BufferManager buffers)
+    {
+        int leftIdx = 0;
+        int rightIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        // first pick a subset candidate, and merge both until we encounter an 
element not present in the other array
+        if (leftLength <= rightLength)
+        {
+            boolean hasMatch = false;
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp >= 0)
+                {
+                    rightIdx += 1;
+                    leftIdx += cmp == 0 ? 1 : 0;
+                    if (cmp == 0)
+                        hasMatch = true;
+                }
+                else
+                {
+                    resultSize = leftIdx++;
+                    result = buffers.get(allocator, resultSize + 
Math.min(leftLength - leftIdx, rightLength - rightIdx));
+                    System.arraycopy(left, 0, result, 0, resultSize);
+                    break;
+                }
+            }
+
+            if (result == null)
+                return hasMatch ? buffers.completeWithExisting(left, 
leftLength) : buffers.complete(allocator, allocator.apply(0), 0);
+        }
+        else
+        {
+            boolean hasMatch = false;
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp <= 0)
+                {
+                    leftIdx += 1;
+                    rightIdx += cmp == 0 ? 1 : 0;
+                    if (cmp == 0)
+                        hasMatch = true;
+                }
+                else
+                {
+                    resultSize = rightIdx++;
+                    result = buffers.get(allocator, resultSize + 
Math.min(leftLength - leftIdx, rightLength - rightIdx));
+                    System.arraycopy(right, 0, result, 0, resultSize);
+                    break;
+                }
+            }
+
+            if (result == null)
+                return hasMatch ? buffers.completeWithExisting(right, 
rightLength) : buffers.complete(allocator, allocator.apply(0), 0);
+        }
+
+        try
+        {
+
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : 
leftKey.compareTo(rightKey);
+
+                if (cmp == 0)
+                {
+                    leftIdx++;
+                    rightIdx++;
+                    result[resultSize++] = leftKey;
+                }
+                else if (cmp < 0) leftIdx++;
+                else rightIdx++;
+            }
+
+            return buffers.complete(allocator, result, resultSize);
+        }
+        finally
+        {
+            buffers.discard(result);
+        }
+    }
+
+    /**
+     * Given two sorted arrays, return the elements present only in the first, 
preferentially returning the first array
+     * itself if possible
+     */
+    @SuppressWarnings("unused") // was used until recently, might be used 
again?
+    public static <T extends Comparable<? super T>> T[] linearDifference(T[] 
left, T[] right, IntFunction<T[]> allocate)
+    {
+        int rightIdx = 0;
+        int leftIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        while (leftIdx < left.length && rightIdx < right.length)
+        {
+            T leftKey = left[leftIdx];
+            T rightKey = right[rightIdx];
+            int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+            if (cmp == 0)
+            {
+                resultSize = leftIdx++;
+                ++rightIdx;
+                result = allocate.apply(resultSize + left.length - leftIdx);
+                System.arraycopy(left, 0, result, 0, resultSize);
+                break;
+            }
+            else if (cmp < 0)
+            {
+                ++leftIdx;
+            }
+            else
+            {
+                ++rightIdx;
+            }
+        }
+
+        if (result == null)
+            return left;
+
+        while (leftIdx < left.length && rightIdx < right.length)
+        {
+            T leftKey = left[leftIdx];
+            T rightKey = right[rightIdx];
+            int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+            if (cmp > 0)
+            {
+                result[resultSize++] = left[leftIdx++];
+            }
+            else if (cmp < 0)
+            {
+                ++rightIdx;
+            }
+            else
+            {
+                ++leftIdx;
+                ++rightIdx;
+            }
+        }
+        while (leftIdx < left.length)
+            result[resultSize++] = left[leftIdx++];
+
+        if (resultSize < result.length)
+            result = Arrays.copyOf(result, resultSize);
+
+        return result;
+    }
+
+    public static <A, R> A[] sliceWithMultipleMatches(A[] slice, R[] select, 
IntFunction<A[]> factory, AsymmetricComparator<A, R> cmp1, 
AsymmetricComparator<R, A> cmp2)
+    {
+        A[] result;
+        int resultCount;
+        int ai = 0, ri = 0;
+        while (true)
+        {
+            long ari = findNextIntersection(slice, ai, select, ri, cmp1, cmp2, 
Search.CEIL);
+            if (ari < 0)
+            {
+                if (ai == slice.length)
+                    return slice; // all elements of slice were found in 
select, so can return the array unchanged
+
+                // The first (ai - 1) elements are present (without a gap), so 
copy just that subset
+                return Arrays.copyOf(slice, ai);
+            }
+
+            int nextai = (int)(ari >>> 32);
+            if (ai != nextai)
+            {
+                // A gap is detected in slice!
+                // When ai == nextai we "consume" it and move to the last 
instance of slice[ai], then choose the next element,
+                // this means that ai currently points to an element in slice 
where it is not known if its present in select,
+                // so != implies a gap is detected!
+                resultCount = ai;
+                result = factory.apply(ai + (slice.length - nextai));
+                System.arraycopy(slice, 0, result, 0, resultCount);
+                ai = nextai;
+                ri = (int)ari;
+                break;
+            }
+
+            ri = (int)ari;
+            // In cases where duplicates are present in slice, find the last 
instance of slice[ai], and move past it.
+            // slice[ai] is known to be present, so need to check the next 
element.
+            ai = exponentialSearch(slice, nextai, slice.length, select[ri], 
cmp2, Search.FLOOR) + 1;
+        }
+
+        while (true)
+        {
+            // Find the next element after the last element matching 
select[ri] and copy from slice into result
+            // nextai may be negative (such as -1), so the +1 may keep it 
negative OR set 0, since 0 < 0 is false
+            // it is safe to avoid checking for negative values
+            int nextai = exponentialSearch(slice, ai, slice.length, 
select[ri], cmp2, Search.FLOOR) + 1;
+            while (ai < nextai)
+                result[resultCount++] = slice[ai++];
+
+            long ari = findNextIntersection(slice, ai, select, ri, cmp1, cmp2, 
Search.CEIL);
+            if (ari < 0)
+            {
+                if (resultCount < result.length)
+                    result = Arrays.copyOf(result, resultCount);
+
+                return result;
+            }
+
+            ai = (int)(ari >>> 32);
+            ri = (int)ari;
+        }
+    }
+
+    /**
+     * Copy-on-write insert into the provided array; returns the same array if 
item already present, or a new array
+     * with the item in the correct position if not. Linear time complexity.
+     */
+    public static <T extends Comparable<? super T>> T[] insert(T[] src, T 
item, IntFunction<T[]> factory)
+    {
+        int insertPos = Arrays.binarySearch(src, item);
+        if (insertPos >= 0)
+            return src;
+        insertPos = -1 - insertPos;
+
+        T[] trg = factory.apply(src.length + 1);
+        System.arraycopy(src, 0, trg, 0, insertPos);
+        trg[insertPos] = item;
+        System.arraycopy(src, insertPos, trg, insertPos + 1, src.length - 
insertPos);
+        return trg;
+    }
+
+    /**
+     * Equivalent to {@link Arrays#binarySearch}, only more efficient 
algorithmically for linear merges.
+     * Binary search has worst case complexity {@code O(n.lg n)} for a linear 
merge, whereas exponential search
+     * has a worst case of {@code O(n)}. However compared to a simple linear 
merge, the best case for exponential
+     * search is {@code O(lg(n))} instead of {@code O(n)}.
+     */
+    public static <T1, T2 extends Comparable<? super T1>> int 
exponentialSearch(T1[] in, int from, int to, T2 find)
+    {
+        return exponentialSearch(in, from, to, find, Comparable::compareTo, 
Search.FAST);
+    }
+
+    // TODO: check inlining elides this
+    public enum Search { FLOOR, CEIL, FAST }
+
+    @Inline
+    public static <T1, T2> int exponentialSearch(T2[] in, int from, int to, T1 
find, AsymmetricComparator<T1, T2> comparator, Search op)
+    {
+        int step = 0;
+        loop: while (from + step < to)
+        {
+            int i = from + step;
+            int c = comparator.compare(find, in[i]);
+            if (c < 0)
+            {
+                to = i;
+                break;
+            }
+            if (c > 0)
+            {
+                from = i + 1;
+            }
+            else
+            {
+                switch (op)
+                {
+                    case FAST:
+                        return i;
+
+                    case CEIL:
+                        if (step == 0)
+                            return from;
+                        to = i + 1; // could in theory avoid one extra 
comparison in this case, but would uglify things
+                        break loop;
+
+                    case FLOOR:
+                        from = i;
+                }
+            }
+            step = step * 2 + 1; // jump in perfect binary search increments
+        }
+        return binarySearch(in, from, to, find, comparator, op);
+    }
+
+    @Inline
+    public static int exponentialSearch(int[] in, int from, int to, int find)
+    {
+        int step = 0;
+        while (from + step < to)
+        {
+            int i = from + step;
+            int c = Integer.compare(find, in[i]);
+            if (c < 0)
+            {
+                to = i;
+                break;
+            }
+            if (c > 0)
+            {
+                from = i + 1;
+            }
+            else
+            {
+                return i;
+            }
+            step = step * 2 + 1; // jump in perfect binary search increments
+        }
+        return Arrays.binarySearch(in, from, to, find);
+    }
+
+    @Inline
+    public static <T1, T2> int binarySearch(T2[] in, int from, int to, T1 
find, AsymmetricComparator<T1, T2> comparator, Search op)
+    {
+        int found = -1;
+        while (from < to)
+        {
+            int i = (from + to) >>> 1;
+            int c = comparator.compare(find, in[i]);
+            if (c < 0)
+            {
+                to = i;
+            }
+            else if (c > 0)
+            {
+                from = i + 1;
+            }
+            else
+            {
+                switch (op)
+                {
+                    default: throw new IllegalStateException();
+                    case FAST:
+                        return i;
+
+                    case CEIL:
+                        to = found = i;
+                        break;
+
+                    case FLOOR:
+                        found = i;
+                        from = i + 1;
+                }
+            }
+        }
+        return found >= 0 ? found : -1 - to;
+    }
+
+    public static <T1, T2 extends Comparable<T1>> long 
findNextIntersectionWithOverlaps(T1[] as, int ai, T2[] bs, int bi)
+    {
+        return findNextIntersectionWithOverlaps(as, ai, bs, bi, (a, b) -> 
-b.compareTo(a), Comparable::compareTo);
+    }
+
+    public static <T1, T2> long findNextIntersectionWithOverlaps(T1[] as, int 
ai, T2[] bs, int bi, AsymmetricComparator<T1, T2> cmp1, 
AsymmetricComparator<T2, T1> cmp2)
+    {
+        return findNextIntersection(as, ai, bs, bi, cmp1, cmp2, Search.CEIL);
+    }
+
+    public static <T extends Comparable<? super T>> long 
findNextIntersection(T[] as, int ai, T[] bs, int bi)
+    {
+        return findNextIntersection(as, ai, bs, bi, Comparable::compareTo);
+    }
+
+    public static <T> long findNextIntersection(T[] as, int ai, T[] bs, int 
bi, AsymmetricComparator<T, T> comparator)
+    {
+        return findNextIntersection(as, ai, bs, bi, comparator, comparator, 
Search.FAST);
+    }
+
+    /**
+     * Given two sorted arrays, find the next index in each array containing 
an equal item.
+     *
+     * Works with CEIL or FAST; FAST to be used if precisely one match for 
each item in either list, CEIL if one item
+     *
+     * in either list may be matched to multiple in the other list.
+     */
+    private static <T1, T2> long findNextIntersection(T1[] as, int ai, T2[] 
bs, int bi, AsymmetricComparator<T1, T2> cmp1, AsymmetricComparator<T2, T1> 
cmp2, Search op)
+    {
+        if (ai == as.length)
+            return -1;
+
+        while (true)
+        {
+            bi = SortedArrays.exponentialSearch(bs, bi, bs.length, as[ai], 
cmp1, op);
+            if (bi >= 0)
+                break;
+
+            bi = -1 - bi;
+            if (bi == bs.length)
+                return -1;
+
+            ai = SortedArrays.exponentialSearch(as, ai, as.length, bs[bi], 
cmp2, op);
+            if (ai >= 0)
+                break;
+
+            ai = -1 - ai;
+            if (ai == as.length)
+                return -1;
+        }
+        return ((long)ai << 32) | bi;
+    }
+
+    public static <T extends Comparable<? super T>> int[] remapToSuperset(T[] 
src, T[] trg)
+    {
+        return remapToSuperset(src, trg, int[]::new);
+    }
+
+    public static <T extends Comparable<? super T>> int[] remapToSuperset(T[] 
src, T[] trg, IntBufferAllocator allocator)
+    {
+        return remapToSuperset(src, src.length, trg, trg.length, allocator);
+    }
+
+    /**
+     * Given two portions of sorted arrays with unique elements, where the one 
is a subset of the other,
+     * return an int[] with its initial {@code srcLength} elements populated, 
with the index within {@code trg}
+     * of the corresponding element within {@code src}, or -1 otherwise.
+     *
+     * That is, {@code result[i] == -1 || src[i].equals(trg[result[i]])}
+     */
+    @Nullable
+    public static <T extends Comparable<? super T>> int[] remapToSuperset(T[] 
src, int srcLength, T[] trg, int trgLength,
+                                                                          
IntBufferAllocator allocator)
+    {
+        if (src == trg || trgLength == srcLength)
+            return null;
+
+        int[] result = allocator.allocateInts(srcLength);
+
+        int i = 0, j = 0;
+        while (i < srcLength && j < trgLength)
+        {
+            if (src[i] != trg[j] && !src[i].equals(trg[j]))
+            {
+                j = SortedArrays.exponentialSearch(trg, j, trgLength, src[i]);
+                if (j < 0)
+                {
+                    if (i > 0 && src[i] == src[i-1])
+                        throw new AssertionError("Unexpected value in source: 
" + src[i] + " at index " + i + " duplicates index " + (i - 1));
+                    throw new AssertionError("Unexpected value in source: " + 
src[i] + " at index " + i + " does not exist in target array");
+                }
+            }
+            result[i++] = j++;
+        }
+        Arrays.fill(result, i, srcLength, -1);

Review Comment:
   when target is the superset of src, src can not have a element not found in 
target else we proved target isn't a superset.  In the previous logic when 
superset=true I rejected, but with this rewrite we allow it again... now that 
the doc and name are clear only supersets are allowed, shouldn't we fail rather 
than put `-1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to