dcapwell commented on code in PR #6:
URL: https://github.com/apache/cassandra-accord/pull/6#discussion_r939438483
##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -0,0 +1,978 @@
+package accord.primitives;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.utils.InlineHeap;
+import accord.utils.SortedArrays;
+
+import static accord.utils.SortedArrays.remap;
+import static accord.utils.SortedArrays.remapper;
+
+// TODO (now): switch to RoutingKey
+public class Deps implements Iterable<Map.Entry<Key, TxnId>>
+{
+ private static final TxnId[] NO_TXNIDS = new TxnId[0];
+ private static final int[] NO_INTS = new int[0];
+ public static final Deps NONE = new Deps(Keys.EMPTY, NO_TXNIDS, NO_INTS);
+
+ public static class Builder
+ {
+ final Keys keys;
+ final Map<TxnId, Integer> txnIdLookup = new HashMap<>(); // TODO:
primitive map
+ TxnId[] txnIds = new TxnId[4];
+ final int[][] keysToTxnId;
+ final int[] keysToTxnIdCounts;
+
+ public Builder(Keys keys)
+ {
+ this.keys = keys;
+ this.keysToTxnId = new int[keys.size()][4];
+ this.keysToTxnIdCounts = new int[keys.size()];
+ }
+
+ public boolean isEmpty()
+ {
+ return Arrays.stream(keysToTxnIdCounts).allMatch(i -> i == 0);
+ }
+
+ public void add(Command command)
+ {
+ int idx = ensureTxnIdx(command.txnId());
+ keys.foldlIntersect(command.txn().keys, (li, ri, k, p, v) -> {
+ if (keysToTxnId[li].length == keysToTxnIdCounts[li])
+ keysToTxnId[li] = Arrays.copyOf(keysToTxnId[li],
keysToTxnId[li].length * 2);
+ keysToTxnId[li][keysToTxnIdCounts[li]++] = idx;
+ return 0;
+ }, 0, 0, 1);
+ }
+
+ public void add(Key key, TxnId txnId)
+ {
+ int txnIdx = ensureTxnIdx(txnId);
+ int keyIdx = keys.indexOf(key);
+ if (keysToTxnIdCounts[keyIdx] == keysToTxnId[keyIdx].length)
+ keysToTxnId[keyIdx] = Arrays.copyOf(keysToTxnId[keyIdx],
Math.max(4, keysToTxnIdCounts[keyIdx] * 2));
+ keysToTxnId[keyIdx][keysToTxnIdCounts[keyIdx]++] = txnIdx;
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return txnIdx(txnId) >= 0;
+ }
+
+ private int txnIdx(TxnId txnId)
+ {
+ return txnIdLookup.getOrDefault(txnId, -1);
+ }
+
+ private int ensureTxnIdx(TxnId txnId)
+ {
+ return txnIdLookup.computeIfAbsent(txnId, ignore -> {
+ if (txnIds.length == txnIdLookup.size())
+ txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
+ return txnIdLookup.size();
+ });
+ }
+
+ public Deps build()
+ {
+ TxnId[] txnIds = txnIdLookup.keySet().toArray(TxnId[]::new);
+ Arrays.sort(txnIds, TxnId::compareTo);
+ int[] txnIdMap = new int[txnIds.length];
+ for (int i = 0 ; i < txnIdMap.length ; i++)
+ txnIdMap[txnIdLookup.get(txnIds[i])] = i;
+
+ int keyCount = 0;
+ int[] result; {
+ int count = 0;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ keyCount += keysToTxnIdCounts[i] > 0 ? 1 : 0;
+ count += keysToTxnIdCounts[i];
+ }
+ result = new int[count + keyCount];
+ }
+
+ int keyIndex = 0;
+ int offset = keyCount;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ {
+ int count = keysToTxnIdCounts[i];
+ int[] src = keysToTxnId[i];
+ for (int j = 0 ; j < count ; ++j)
+ result[j + offset] = txnIdMap[src[j]];
+ Arrays.sort(result, offset, count + offset);
+ int dups = 0;
+ for (int j = offset + 1 ; j < offset + count ; ++j)
+ {
+ if (result[j] == result[j - 1]) ++dups;
+ else if (dups > 0) result[j - dups] = result[j];
+ }
+ result[keyIndex] = offset += count - dups;
+ ++keyIndex;
+ }
+ }
+ if (offset < result.length)
+ result = Arrays.copyOf(result, offset);
+
+ Keys keys = this.keys;
+ if (keyCount < keys.size())
+ {
+ keyIndex = 0;
+ Key[] newKeys = new Key[keyCount];
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ newKeys[keyIndex++] = keys.get(i);
+ }
+ keys = new Keys(newKeys);
+ }
+
+ return new Deps(keys, txnIds, result);
+ }
+ }
+
+ public static Builder builder(Keys keys)
+ {
+ return new Builder(keys);
+ }
+
+ static class MergeStream
+ {
+ final Deps source;
+ // TODO: could share backing array for all of these if we want, with
an additional offset
+ final int[] input;
+ final int keyCount;
+ int[] remap; // TODO: use cached backing array
+ int[] keys; // TODO: use cached backing array
+ int keyIndex;
+ int index;
+ int endIndex;
+
+ MergeStream(Deps source)
+ {
+ this.source = source;
+ this.input = source.keyToTxnId;
+ this.keyCount = source.keys.size();
+ }
+
+ private void init(Keys keys, TxnId[] txnIds)
+ {
+ this.remap = remapper(source.txnIds, txnIds, true);
+ this.keys = source.keys.remapper(keys, true);
+ while (input[keyIndex] == keyCount)
+ ++keyIndex;
+ this.index = keyCount;
+ this.endIndex = input[keyIndex];
+ }
+ }
+
+ public static <T> Deps merge(Keys keys, List<T> merge, Function<T, Deps>
getter)
+ {
+ // collect non-empty inputs
+ int streamCount = 0;
+ MergeStream[] streams = new MergeStream[merge.size()];
+ for (T t : merge)
+ {
+ Deps deps = getter.apply(t);
+ if (deps != null && !deps.isEmpty())
+ streams[streamCount++] = new MergeStream(deps);
+ }
+
+ {
+ // first sort by size and remove identical collections
+ Arrays.sort(streams, 0, streamCount, (a, b) -> {
+ int c = Integer.compare(b.input.length, a.input.length);
+ if (c == 0) c = Integer.compare(b.keyCount, a.keyCount);
+ if (c == 0) c = Integer.compare(b.source.txnIds.length,
a.source.txnIds.length);
+ return c;
+ });
+
+ int diff = 0;
+ for (int i = 1 ; i < streamCount ; i++)
+ {
+ if (streams[i - 1].source.equals(streams[i].source)) ++diff;
+ else if (diff > 0) streams[i - diff] = streams[i];
+ }
+ streamCount -= diff;
+ }
+
+ switch (streamCount)
+ {
+ case 0: return NONE;
+ case 1: return streams[0].source;
+ case 2: return streams[0].source.with(streams[1].source);
+ }
+
+ // TODO: use Cassandra MergeIterator to perform more efficient merge
of TxnId
+ TxnId[] txnIds = NONE.txnIds;
+ for (T t : merge)
+ {
+ Deps deps = getter.apply(t);
+ if (deps != null && !deps.isEmpty())
+ txnIds = SortedArrays.linearUnion(txnIds, deps.txnIds,
TxnId[]::new);
+ }
+
+ int[] result; {
+ int maxStreamSize = 0, totalStreamSize = 0;
+ for (int streamIndex = 0 ; streamIndex < streamCount ;
++streamIndex)
+ {
+ MergeStream stream = streams[streamIndex];
+ stream.init(keys, txnIds);
+ maxStreamSize = Math.max(maxStreamSize, stream.input.length -
stream.keyCount);
+ totalStreamSize += stream.input.length - stream.keyCount;
+ }
+ result = new int[keys.size() + Math.min(maxStreamSize * 2,
totalStreamSize)]; // TODO: use cached temporary array
+ }
+
+ int resultIndex = keys.size();
+ int[] keyHeap = InlineHeap.create(streamCount); // TODO: use cached
temporary array
+ int[] txnIdHeap = InlineHeap.create(streamCount); // TODO: use cached
temporary array
+
+ // build a heap of keys and streams, so we can merge those streams
with overlapping keys
+ int keyHeapSize = 0;
+ for (int stream = 0 ; stream < streamCount ; ++stream)
+ {
+ InlineHeap.set(keyHeap, keyHeapSize++, remap(0,
streams[stream].keys), stream);
+ }
+
+ int keyIndex = 0;
+ keyHeapSize = InlineHeap.heapify(keyHeap, keyHeapSize);
+ while (keyHeapSize > 0)
+ {
+ // while the heap is non-empty, pop the streams matching the top
key and insert them into their own heap
+
+ // if we have keys with no contents, fill in zeroes
+ while (keyIndex < InlineHeap.key(keyHeap, 0))
+ {
+ if (keyIndex == result.length)
+ result = Arrays.copyOf(result, result.length * 2);
+
+ result[keyIndex++] = resultIndex;
+ }
+
+ int txnIdHeapSize = InlineHeap.consume(keyHeap, keyHeapSize, (key,
streamIndex, size) -> {
+ MergeStream stream = streams[streamIndex];
+ InlineHeap.set(txnIdHeap, size,
remap(stream.input[stream.index], stream.remap), streamIndex);
+ return size + 1;
+ }, 0);
+
+ if (txnIdHeapSize > 1)
+ {
+ txnIdHeapSize = InlineHeap.heapify(txnIdHeap, txnIdHeapSize);
+ do
+ {
+ if (resultIndex + txnIdHeapSize >= result.length)
+ result = Arrays.copyOf(result, Math.max((resultIndex +
txnIdHeapSize) * 2, resultIndex + (resultIndex/2)));
+
+ result[resultIndex++] = InlineHeap.key(txnIdHeap, 0);
+ InlineHeap.consume(txnIdHeap, txnIdHeapSize, (key, stream,
v) -> 0, 0);
+
+ txnIdHeapSize = InlineHeap.advance(txnIdHeap,
txnIdHeapSize, streamIndex -> {
+ MergeStream stream = streams[streamIndex];
+ int index = ++stream.index;
+ if (index == stream.endIndex)
+ return Integer.MIN_VALUE;
+ return remap(stream.input[index], stream.remap);
+ });
+ }
+ while (txnIdHeapSize > 1);
+ }
+
+ // fast path when one remaining source for this key
+ if (txnIdHeapSize > 0)
+ {
+ int streamIndex = InlineHeap.stream(txnIdHeap, 0);
+ MergeStream stream = streams[streamIndex];
+ int index = stream.index;
+ int endIndex = stream.endIndex;
+ int count = endIndex - index;
+ if (result.length < resultIndex + count)
+ result = Arrays.copyOf(result, Math.max(result.length +
(result.length / 2), resultIndex + count));
+
+ while (index < endIndex)
+ result[resultIndex++] = remap(stream.input[index++],
stream.remap);
+
+ stream.index = index;
+ stream.endIndex = stream.input[++stream.keyIndex];
+ }
+
+ result[keyIndex++] = resultIndex;
+ keyHeapSize = InlineHeap.advance(keyHeap, keyHeapSize, streamIndex
-> {
+ MergeStream stream = streams[streamIndex];
+ while (stream.index == stream.endIndex && stream.keyIndex <
stream.keyCount)
+ stream.endIndex = stream.input[++stream.keyIndex];
+ return stream.keyIndex == stream.keyCount
+ ? Integer.MIN_VALUE
+ : remap(stream.keyIndex, stream.keys);
+ });
+ }
+
+ while (keyIndex < keys.size())
+ result[keyIndex++] = resultIndex;
+
+ if (resultIndex < result.length)
+ result = Arrays.copyOf(result, resultIndex);
+
+ return new Deps(keys, txnIds, result);
+ }
+
+ final Keys keys; // unique Keys
+ final TxnId[] txnIds; // unique TxnId TODO: this should perhaps be a BTree?
+
+ // first N entries are offsets for each src item, remainder are pointers
into value set (either keys or txnIds)
+ final int[] keyToTxnId; // Key -> [TxnId]
+ int[] txnIdToKey; // TxnId -> [Key]
+
+ Deps(Keys keys, TxnId[] txnIds, int[] keyToTxnId)
+ {
+ this.keys = keys;
+ this.txnIds = txnIds;
+ this.keyToTxnId = keyToTxnId;
+ Preconditions.checkState(keys.isEmpty() || keyToTxnId[keys.size() - 1]
== keyToTxnId.length);
+ if (!checkValid())
+ throw new AssertionError();
+ }
+
+ public Deps slice(KeyRanges ranges)
+ {
+ if (isEmpty())
+ return this;
+
+ Keys select = keys.slice(ranges);
+ if (select.isEmpty())
+ return NONE;
+
+ if (select.size() == keys.size())
+ return this;
+
+ int i = 0;
+ int offset = select.size();
+ for (int j = 0 ; j < select.size() ; ++j)
+ {
+ i = keys.findNext(select.get(j), i);
+ offset += keyToTxnId[i] - (i == 0 ? keys.size() : keyToTxnId[i -
1]);
+ }
+
+ int[] src = keyToTxnId;
+ int[] trg = new int[offset];
+
+ i = 0;
+ offset = select.size();
+ for (int j = 0 ; j < select.size() ; ++j)
+ {
+ i = keys.findNext(select.get(j), i);
+ int start = i == 0 ? keys.size() : src[i - 1];
+ int count = src[i] - start;
+ System.arraycopy(src, start, trg, offset, count);
+ offset += count;
+ trg[j] = offset;
+ }
+
+ TxnId[] txnIds = trimUnusedTxnId(select, this.txnIds, trg);
+ return new Deps(select, txnIds, trg);
+ }
+
+ private static TxnId[] trimUnusedTxnId(Keys keys, TxnId[] txnIds, int[]
keysToTxnId)
+ {
+ int[] remapTxnId = new int[txnIds.length];
+ for (int i = keys.size() ; i < keysToTxnId.length ; ++i)
+ remapTxnId[keysToTxnId[i]] = 1;
+
+ int offset = 0;
+ for (int i = 0 ; i < remapTxnId.length ; ++i)
+ {
+ if (remapTxnId[i] == 1) remapTxnId[i] = offset++;
+ else remapTxnId[i] = -1;
+ }
+
+ TxnId[] result = txnIds;
+ if (offset < remapTxnId.length)
+ {
+ result = new TxnId[offset];
+ for (int i = 0 ; i < txnIds.length ; ++i)
+ {
+ if (remapTxnId[i]>= 0)
+ result[remapTxnId[i]] = txnIds[i];
+ }
+ for (int i = keys.size() ; i < keysToTxnId.length ; ++i)
+ keysToTxnId[i] = remapTxnId[keysToTxnId[i]];
+ }
+
+ return result;
+ }
+
+ public Deps with(Deps that)
+ {
+ if (isEmpty() || that.isEmpty())
+ return isEmpty() ? that : this;
+
+ Keys keys = this.keys.union(that.keys);
+ TxnId[] txnIds = SortedArrays.linearUnion(this.txnIds, that.txnIds,
TxnId[]::new);
+ int[] remapLeft = remapper(this.txnIds, txnIds, true);
+ int[] remapRight = remapper(that.txnIds, txnIds, true);
+ Keys leftKeys = this.keys, rightKeys = that.keys;
+ int[] left = keyToTxnId, right = that.keyToTxnId;
+
+ if (remapLeft == null && remapRight == null && Arrays.equals(left,
right)
+ && keys.size() == leftKeys.size() && keys.size() ==
rightKeys.size())
+ {
+ return this;
+ }
+
+ int[] out = null;
+ int lk = 0, rk = 0, ok = 0, l = this.keys.size(), r =
that.keys.size(), o = keys.size();
+
+ if (remapLeft == null && keys == leftKeys)
+ {
+ noOp: while (lk < leftKeys.size() && rk < rightKeys.size())
+ {
+ int ck = leftKeys.get(lk).compareTo(rightKeys.get(rk));
+ if (ck < 0)
+ {
+ o += left[lk] - l;
+ l = left[lk];
+ assert o == l && ok == lk && left[ok] == o;
+ ok++;
+ lk++;
+ }
+ else if (ck > 0)
+ {
+ throw new IllegalStateException();
+ }
+ else
+ {
+ while (l < left[lk] && r < right[rk])
+ {
+ int nextLeft = left[l];
+ int nextRight = remap(right[r], remapRight);
+
+ if (nextLeft < nextRight)
+ {
+ o++;
+ l++;
+ }
+ else if (nextRight < nextLeft)
+ {
+ out = copy(left, o, left.length + right.length -
r);
+ break noOp;
+ }
+ else
+ {
+ o++;
+ l++;
+ r++;
+ }
+ }
+
+ if (l < left[lk])
+ {
+ o += left[lk] - l;
+ l = left[lk];
+ }
+ else if (r < right[rk])
+ {
+ out = copy(left, o, left.length + right.length - r);
+ break;
+ }
+
+ assert o == l && ok == lk && left[ok] == o;
+ ok++;
+ rk++;
+ lk++;
+ }
+ }
+
+ if (out == null)
+ return this;
+ }
+ else if (remapRight == null && keys == rightKeys)
+ {
+ noOp: while (lk < leftKeys.size() && rk < rightKeys.size())
+ {
+ int ck = leftKeys.get(lk).compareTo(rightKeys.get(rk));
+ if (ck < 0)
+ {
+ throw new IllegalStateException();
+ }
+ else if (ck > 0)
+ {
+ o += right[rk] - r;
+ r = right[rk];
+ assert o == r && ok == rk && right[ok] == o;
+ ok++;
+ rk++;
+ }
+ else
+ {
+ while (l < left[lk] && r < right[rk])
+ {
+ int nextLeft = remap(left[l], remapLeft);
+ int nextRight = right[r];
+
+ if (nextLeft < nextRight)
+ {
+ out = copy(right, o, right.length + left.length -
l);
+ break noOp;
+ }
+ else if (nextRight < nextLeft)
+ {
+ o++;
+ r++;
+ }
+ else
+ {
+ o++;
+ l++;
+ r++;
+ }
+ }
+
+ if (l < left[lk])
+ {
+ out = copy(right, o, right.length + left.length - l);
+ break;
+ }
+ else if (r < right[rk])
+ {
+ o += right[rk] - r;
+ r = right[rk];
+ }
+
+ assert o == r && ok == rk && right[ok] == o;
+ ok++;
+ rk++;
+ lk++;
+ }
+ }
+
+ if (out == null)
+ return that;
+ }
+ else
+ {
+ out = new int[left.length + right.length];
+ }
+
+ while (lk < leftKeys.size() && rk < rightKeys.size())
+ {
+ int ck = leftKeys.get(lk).compareTo(rightKeys.get(rk));
+ if (ck < 0)
+ {
+ while (l < left[lk])
+ out[o++] = remap(left[l++], remapLeft);
+ out[ok++] = o;
+ lk++;
+ }
+ else if (ck > 0)
+ {
+ while (r < right[rk])
+ out[o++] = remap(right[r++], remapRight);
+ out[ok++] = o;
+ rk++;
+ }
+ else
+ {
+ while (l < left[lk] && r < right[rk])
+ {
+ int nextLeft = remap(left[l], remapLeft);
+ int nextRight = remap(right[r], remapRight);
+
+ if (nextLeft <= nextRight)
+ {
+ out[o++] = nextLeft;
+ l += 1;
+ r += nextLeft == nextRight ? 1 : 0;
+ }
+ else
+ {
+ out[o++] = nextRight;
+ ++r;
+ }
+ }
+
+ while (l < left[lk])
+ out[o++] = remap(left[l++], remapLeft);
+
+ while (r < right[rk])
+ out[o++] = remap(right[r++], remapRight);
+
+ out[ok++] = o;
+ rk++;
+ lk++;
+ }
+ }
+
+ while (lk < leftKeys.size())
+ {
+ while (l < left[lk])
+ out[o++] = remap(left[l++], remapLeft);
+ out[ok++] = o;
+ lk++;
+ }
+
+ while (rk < rightKeys.size())
+ {
+ while (r < right[rk])
+ out[o++] = remap(right[r++], remapRight);
+ out[ok++] = o;
+ rk++;
+ }
+
+ if (o < out.length)
+ out = Arrays.copyOf(out, o);
+
+ return new Deps(keys, txnIds, out);
+ }
+
+ private static int[] copy(int[] src, int to, int length)
+ {
+ if (length == 0)
+ return NO_INTS;
+
+ int[] result = new int[length];
+ System.arraycopy(src, 0, result, 0, to);
+ return result;
+ }
+
+ // TODO: optimise for case where none removed
+ public Deps without(Predicate<TxnId> remove)
+ {
+ if (isEmpty())
+ return this;
+
+ int[] remapTxnIds = new int[txnIds.length];
+ TxnId[] txnIds; {
+ int count = 0;
+ for (int i = 0 ; i < this.txnIds.length ; ++i)
+ {
+ if (remove.test(this.txnIds[i])) remapTxnIds[i] = -1;
+ else remapTxnIds[i] = count++;
+ }
+
+ if (count == remapTxnIds.length)
+ return this;
+
+ if (count == 0)
+ return NONE;
+
+ txnIds = new TxnId[count];
+ for (int i = 0 ; i < this.txnIds.length ; ++i)
+ {
+ if (remapTxnIds[i] >= 0)
+ txnIds[remapTxnIds[i]] = this.txnIds[i];
+ }
+ }
+
+ int[] keyToTxnId = new int[this.keyToTxnId.length];
+ int k = 0, i = keys.size(), o = i;
+ while (i < this.keyToTxnId.length)
+ {
+ while (this.keyToTxnId[k] == i)
+ keyToTxnId[k++] = o;
+
+ int remapped = remapTxnIds[this.keyToTxnId[i]];
+ if (remapped > 0)
+ keyToTxnId[o++] = remapped;
+ ++i;
+ }
+
+ while (k < keys.size())
+ keyToTxnId[k++] = o;
+
+ keyToTxnId = Arrays.copyOf(keyToTxnId, o);
+
+ return new Deps(keys, txnIds, keyToTxnId);
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return Arrays.binarySearch(txnIds, txnId) >= 0;
+ }
+
+ // return true iff we map any keys to any txnId
+ // if the mapping is empty we return false, whether or not we have any
keys or txnId by themselves
+ public boolean isEmpty()
+ {
+ return keyToTxnId.length == keys.size();
+ }
+
+ public Keys someKeys(TxnId txnId)
+ {
+ int txnIdIndex = Arrays.binarySearch(txnIds, txnId);
+ if (txnIdIndex < 0)
+ return Keys.EMPTY;
+
+ ensureTxnIdToKey();
+
+ int start = txnIdIndex == 0 ? txnIds.length : txnIdToKey[txnIdIndex -
1];
+ int end = txnIdToKey[txnIdIndex];
+ if (start == end)
+ return Keys.EMPTY;
+
+ Key[] result = new Key[end - start];
+ for (int i = start ; i < end ; ++i)
+ result[i - start] = keys.get(txnIdToKey[i]);
+ return new Keys(result);
+ }
+
+ private void ensureTxnIdToKey()
+ {
+ if (txnIdToKey != null)
+ return;
+
+ int[] src = keyToTxnId;
+ int[] trg = txnIdToKey = new int[txnIds.length - keys.size() +
keyToTxnId.length];
+
+ // first pass, count number of txnId per key
+ for (int i = keys.size() ; i < src.length ; ++i)
+ trg[src[i]]++;
+
+ // turn into offsets (i.e. add txnIds.size() and then sum them)
+ trg[0] += txnIds.length;
+ for (int i = 1; i < txnIds.length ; ++i)
+ trg[i] += trg[i - 1];
+
+ // shuffle forwards one, so we have the start index rather than end
+ System.arraycopy(trg, 0, trg, 1, txnIds.length - 1);
+ trg[0] = txnIds.length;
+
+ int k = 0;
+ for (int i = keys.size() ; i < src.length ; ++i)
+ {
+ while (i == keyToTxnId[k])
+ ++k;
+
+ trg[trg[src[i]]++] = k;
+ }
+ }
+
+ public void forEachOn(KeyRanges ranges, Predicate<Key> include,
BiConsumer<Key, TxnId> forEach)
+ {
+ keys.foldl(ranges, (index, key, value) -> {
+ if (!include.test(key))
+ return null;
+
+ for (int t = index == 0 ? keys.size() : keyToTxnId[index - 1], end
= keyToTxnId[index]; t < end ; ++t)
+ {
+ TxnId txnId = txnIds[keyToTxnId[t]];
+ forEach.accept(key, txnId);
+ }
+ return null;
+ }, null);
+ }
+
+ public void forEachOn(CommandStore commandStore, Timestamp executeAt,
Consumer<TxnId> forEach)
+ {
+ KeyRanges ranges = commandStore.ranges().since(executeAt.epoch);
+ if (ranges == null)
+ return;
+
+ // TODO: check inlining
+ forEachOn(ranges, commandStore::hashIntersects, forEach);
+ }
+
+ public void forEachOn(KeyRanges ranges, Predicate<Key> include,
Consumer<TxnId> forEach)
+ {
+ for (int offset = 0 ; offset < txnIds.length ; offset += 64)
+ {
+ long bitset = keys.foldl(ranges, (keyIndex, key, off, value) -> {
+ if (!include.test(key))
+ return value;
+
+ int index = keyIndex == 0 ? keys.size() : keyToTxnId[keyIndex
- 1];
+ int end = keyToTxnId[keyIndex];
+ if (off > 0)
+ {
+ // TODO: interpolation search probably great here
+ index = Arrays.binarySearch(keyToTxnId, index, end,
(int)off);
+ if (index < 0)
+ index = -1 - index;
+ }
+
+ while (index < end)
+ {
+ long next = keyToTxnId[index++] - off;
+ if (next >= 64)
+ break;
+ value |= 1L << next;
+ }
+
+ return value;
+ }, offset, 0, 0xffffffffffffffffL);
Review Comment:
I think `-1L` is best, with `0xffffffffffffffffL` I have to count all the
`f` to know how many bits are set
##########
accord-core/src/main/java/accord/primitives/Keys.java:
##########
@@ -0,0 +1,449 @@
+package accord.primitives;
+
+import java.util.*;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import accord.api.Key;
+import accord.utils.IndexedFold;
+import accord.utils.IndexedFoldIntersectToLong;
+import accord.utils.IndexedFoldToLong;
+import accord.utils.IndexedPredicate;
+import accord.utils.IndexedRangeFoldToLong;
+import accord.utils.SortedArrays;
+import org.apache.cassandra.utils.concurrent.Inline;
+
+@SuppressWarnings("rawtypes")
+// TODO: this should probably be a BTree
+// TODO: check that foldl call-sites are inlined and optimised by HotSpot
+public class Keys implements Iterable<Key>
+{
+ public static final Keys EMPTY = new Keys(new Key[0]);
+
+ final Key[] keys;
+
+ public Keys(SortedSet<? extends Key> keys)
+ {
+ this.keys = keys.toArray(Key[]::new);
+ }
+
+ public Keys(Collection<? extends Key> keys)
+ {
+ this.keys = keys.toArray(Key[]::new);
+ Arrays.sort(this.keys);
+ }
+
+ public Keys(Key[] keys)
+ {
+ this.keys = keys;
+ Arrays.sort(keys);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Keys keys1 = (Keys) o;
+ return Arrays.equals(keys, keys1.keys);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Arrays.hashCode(keys);
+ }
+
+ public int indexOf(Key key)
+ {
+ return Arrays.binarySearch(keys, key);
+ }
+
+ public boolean contains(Key key)
+ {
+ return indexOf(key) >= 0;
+ }
+
+ public Key get(int indexOf)
+ {
+ return keys[indexOf];
+ }
+
+ public boolean isEmpty()
+ {
+ return keys.length == 0;
+ }
+
+ public int size()
+ {
+ return keys.length;
+ }
+
+ public Keys select(int[] indexes)
+ {
+ Key[] selection = new Key[indexes.length];
+ for (int i = 0 ; i < indexes.length ; ++i)
+ selection[i] = keys[indexes[i]];
+ return new Keys(selection);
+ }
+
+ /**
+ * return true if this keys collection contains all keys found in the
given keys
+ */
+ public boolean containsAll(Keys that)
+ {
+ if (that.isEmpty())
+ return true;
+
+ return foldlIntersect(that, (li, ri, k, p, v) -> v + 1, 0, 0, 0) ==
that.size();
+ }
+
+ public Keys union(Keys that)
+ {
+ return wrap(SortedArrays.linearUnion(keys, that.keys, Key[]::new),
that);
+ }
+
+ public Keys intersect(Keys that)
+ {
+ return wrap(SortedArrays.linearIntersection(keys, that.keys,
Key[]::new), that);
+ }
+
+ public Keys slice(KeyRanges ranges)
+ {
+ return wrap(SortedArrays.sliceWithOverlaps(keys, ranges.ranges,
Key[]::new, (k, r) -> -r.compareTo(k), KeyRange::compareTo));
+ }
+
+ public int search(int lowerBound, int upperBound, Object key,
Comparator<Object> comparator)
+ {
+ return Arrays.binarySearch(keys, lowerBound, upperBound, key,
comparator);
+ }
+
+ public int findNext(Key key, int startIndex)
+ {
+ return SortedArrays.exponentialSearch(keys, startIndex, keys.length,
key);
+ }
+
+ // returns thisIdx in top 32 bits, thatIdx in bottom
+ public long findNextIntersection(int thisIdx, Keys that, int thatIdx)
+ {
+ return SortedArrays.findNextIntersection(this.keys, thisIdx,
that.keys, thatIdx);
+ }
+
+ public Keys with(Key key)
+ {
+ int insertPos = Arrays.binarySearch(keys, key);
+ if (insertPos >= 0)
+ return this;
+ insertPos = -1 - insertPos;
+
+ Key[] src = keys;
+ Key[] trg = new Key[src.length + 1];
+ System.arraycopy(src, 0, trg, 0, insertPos);
+ trg[insertPos] = key;
+ System.arraycopy(src, insertPos, trg, insertPos + 1, src.length -
insertPos);
+ return new Keys(trg);
+ }
+
+ public Stream<Key> stream()
+ {
+ return Stream.of(keys);
+ }
+
+ @Override
+ public Iterator<Key> iterator()
+ {
+ return new Iterator<>()
+ {
+ int i = 0;
+ @Override
+ public boolean hasNext()
+ {
+ return i < keys.length;
+ }
+
+ @Override
+ public Key next()
+ {
+ return keys[i++];
+ }
+ };
+ }
+
+ public static Keys of(Key k0, Key... kn)
+ {
+ Key[] keys = new Key[kn.length + 1];
+ keys[0] = k0;
+ for (int i=0; i<kn.length; i++)
+ keys[i + 1] = kn[i];
+
+ return new Keys(keys);
+ }
+
+ public boolean any(KeyRanges ranges, Predicate<Key> predicate)
+ {
+ return 1 == foldl(ranges, (i1, key, i2, i3) -> predicate.test(key) ? 1
: 0, 0, 0, 1);
+ }
+
+ public boolean any(IndexedPredicate<Key> predicate)
+ {
+ return 1 == foldl((i, key, p, v) -> predicate.test(i, key) ? 1 : 0, 0,
0, 1);
+ }
+
+ /**
+ * Count the number of keys matching the predicate and intersecting with
the given ranges.
+ * If terminateAfter is greater than 0, the method will return once
terminateAfter matches are encountered
+ */
+ @Inline
+ public <V> V foldl(KeyRanges rs, IndexedFold<Key, V> fold, V accumulator)
+ {
+ int ai = 0, ri = 0;
+ while (true)
+ {
+ long ari = rs.findNextIntersection(ri, this, ai);
+ if (ari < 0)
+ break;
+
+ ai = (int)(ari >>> 32);
+ ri = (int)ari;
+ KeyRange range = rs.get(ri);
+ int nextai = range.higherKeyIndex(this, ai + 1, keys.length);
+ while (ai < nextai)
+ {
+ accumulator = fold.apply(ai, get(ai), accumulator);
+ ++ai;
+ }
+ }
+
+ return accumulator;
+ }
+
+ @Inline
+ public long foldl(KeyRanges rs, IndexedFoldToLong<Key> fold, long param,
long initialValue, long terminalValue)
Review Comment:
feel we can get rid of `long param` as its just there to pass a
`shards.length` to `ShardedRanges::addKeyIndex`, every other call path has
access to it already.
##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -0,0 +1,978 @@
+package accord.primitives;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.utils.InlineHeap;
+import accord.utils.SortedArrays;
+
+import static accord.utils.SortedArrays.remap;
+import static accord.utils.SortedArrays.remapper;
+
+// TODO (now): switch to RoutingKey
+public class Deps implements Iterable<Map.Entry<Key, TxnId>>
+{
+ private static final TxnId[] NO_TXNIDS = new TxnId[0];
+ private static final int[] NO_INTS = new int[0];
+ public static final Deps NONE = new Deps(Keys.EMPTY, NO_TXNIDS, NO_INTS);
+
+ public static class Builder
+ {
+ final Keys keys;
+ final Map<TxnId, Integer> txnIdLookup = new HashMap<>(); // TODO:
primitive map
+ TxnId[] txnIds = new TxnId[4];
+ final int[][] keysToTxnId;
+ final int[] keysToTxnIdCounts;
+
+ public Builder(Keys keys)
+ {
+ this.keys = keys;
+ this.keysToTxnId = new int[keys.size()][4];
+ this.keysToTxnIdCounts = new int[keys.size()];
+ }
+
+ public boolean isEmpty()
+ {
+ return Arrays.stream(keysToTxnIdCounts).allMatch(i -> i == 0);
+ }
+
+ public void add(Command command)
+ {
+ int idx = ensureTxnIdx(command.txnId());
+ keys.foldlIntersect(command.txn().keys, (li, ri, k, p, v) -> {
+ if (keysToTxnId[li].length == keysToTxnIdCounts[li])
+ keysToTxnId[li] = Arrays.copyOf(keysToTxnId[li],
keysToTxnId[li].length * 2);
+ keysToTxnId[li][keysToTxnIdCounts[li]++] = idx;
+ return 0;
+ }, 0, 0, 1);
+ }
+
+ public void add(Key key, TxnId txnId)
+ {
+ int txnIdx = ensureTxnIdx(txnId);
+ int keyIdx = keys.indexOf(key);
+ if (keysToTxnIdCounts[keyIdx] == keysToTxnId[keyIdx].length)
+ keysToTxnId[keyIdx] = Arrays.copyOf(keysToTxnId[keyIdx],
Math.max(4, keysToTxnIdCounts[keyIdx] * 2));
+ keysToTxnId[keyIdx][keysToTxnIdCounts[keyIdx]++] = txnIdx;
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return txnIdx(txnId) >= 0;
+ }
+
+ private int txnIdx(TxnId txnId)
+ {
+ return txnIdLookup.getOrDefault(txnId, -1);
+ }
+
+ private int ensureTxnIdx(TxnId txnId)
+ {
+ return txnIdLookup.computeIfAbsent(txnId, ignore -> {
+ if (txnIds.length == txnIdLookup.size())
+ txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
+ return txnIdLookup.size();
+ });
+ }
+
+ public Deps build()
+ {
+ TxnId[] txnIds = txnIdLookup.keySet().toArray(TxnId[]::new);
+ Arrays.sort(txnIds, TxnId::compareTo);
+ int[] txnIdMap = new int[txnIds.length];
+ for (int i = 0 ; i < txnIdMap.length ; i++)
+ txnIdMap[txnIdLookup.get(txnIds[i])] = i;
+
+ int keyCount = 0;
+ int[] result; {
+ int count = 0;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ keyCount += keysToTxnIdCounts[i] > 0 ? 1 : 0;
+ count += keysToTxnIdCounts[i];
+ }
+ result = new int[count + keyCount];
+ }
+
+ int keyIndex = 0;
+ int offset = keyCount;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ {
+ int count = keysToTxnIdCounts[i];
+ int[] src = keysToTxnId[i];
+ for (int j = 0 ; j < count ; ++j)
+ result[j + offset] = txnIdMap[src[j]];
+ Arrays.sort(result, offset, count + offset);
+ int dups = 0;
+ for (int j = offset + 1 ; j < offset + count ; ++j)
+ {
+ if (result[j] == result[j - 1]) ++dups;
+ else if (dups > 0) result[j - dups] = result[j];
+ }
+ result[keyIndex] = offset += count - dups;
+ ++keyIndex;
+ }
+ }
+ if (offset < result.length)
+ result = Arrays.copyOf(result, offset);
+
+ Keys keys = this.keys;
+ if (keyCount < keys.size())
+ {
+ keyIndex = 0;
+ Key[] newKeys = new Key[keyCount];
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ newKeys[keyIndex++] = keys.get(i);
+ }
+ keys = new Keys(newKeys);
+ }
+
+ return new Deps(keys, txnIds, result);
+ }
+ }
+
+ public static Builder builder(Keys keys)
+ {
+ return new Builder(keys);
+ }
+
+ static class MergeStream
+ {
+ final Deps source;
+ // TODO: could share backing array for all of these if we want, with
an additional offset
+ final int[] input;
+ final int keyCount;
+ int[] remap; // TODO: use cached backing array
+ int[] keys; // TODO: use cached backing array
+ int keyIndex;
+ int index;
+ int endIndex;
+
+ MergeStream(Deps source)
+ {
+ this.source = source;
+ this.input = source.keyToTxnId;
+ this.keyCount = source.keys.size();
+ }
+
+ private void init(Keys keys, TxnId[] txnIds)
+ {
+ this.remap = remapper(source.txnIds, txnIds, true);
+ this.keys = source.keys.remapper(keys, true);
+ while (input[keyIndex] == keyCount)
+ ++keyIndex;
+ this.index = keyCount;
+ this.endIndex = input[keyIndex];
+ }
+ }
+
+ public static <T> Deps merge(Keys keys, List<T> merge, Function<T, Deps>
getter)
+ {
+ // collect non-empty inputs
+ int streamCount = 0;
+ MergeStream[] streams = new MergeStream[merge.size()];
+ for (T t : merge)
+ {
+ Deps deps = getter.apply(t);
+ if (deps != null && !deps.isEmpty())
+ streams[streamCount++] = new MergeStream(deps);
+ }
+
+ {
+ // first sort by size and remove identical collections
+ Arrays.sort(streams, 0, streamCount, (a, b) -> {
+ int c = Integer.compare(b.input.length, a.input.length);
+ if (c == 0) c = Integer.compare(b.keyCount, a.keyCount);
+ if (c == 0) c = Integer.compare(b.source.txnIds.length,
a.source.txnIds.length);
+ return c;
+ });
+
+ int diff = 0;
+ for (int i = 1 ; i < streamCount ; i++)
+ {
+ if (streams[i - 1].source.equals(streams[i].source)) ++diff;
+ else if (diff > 0) streams[i - diff] = streams[i];
+ }
+ streamCount -= diff;
+ }
+
+ switch (streamCount)
+ {
+ case 0: return NONE;
+ case 1: return streams[0].source;
+ case 2: return streams[0].source.with(streams[1].source);
+ }
+
+ // TODO: use Cassandra MergeIterator to perform more efficient merge
of TxnId
+ TxnId[] txnIds = NONE.txnIds;
+ for (T t : merge)
+ {
+ Deps deps = getter.apply(t);
+ if (deps != null && !deps.isEmpty())
+ txnIds = SortedArrays.linearUnion(txnIds, deps.txnIds,
TxnId[]::new);
+ }
+
+ int[] result; {
+ int maxStreamSize = 0, totalStreamSize = 0;
+ for (int streamIndex = 0 ; streamIndex < streamCount ;
++streamIndex)
+ {
+ MergeStream stream = streams[streamIndex];
+ stream.init(keys, txnIds);
+ maxStreamSize = Math.max(maxStreamSize, stream.input.length -
stream.keyCount);
+ totalStreamSize += stream.input.length - stream.keyCount;
+ }
+ result = new int[keys.size() + Math.min(maxStreamSize * 2,
totalStreamSize)]; // TODO: use cached temporary array
+ }
+
+ int resultIndex = keys.size();
+ int[] keyHeap = InlineHeap.create(streamCount); // TODO: use cached
temporary array
+ int[] txnIdHeap = InlineHeap.create(streamCount); // TODO: use cached
temporary array
+
+ // build a heap of keys and streams, so we can merge those streams
with overlapping keys
+ int keyHeapSize = 0;
+ for (int stream = 0 ; stream < streamCount ; ++stream)
+ {
+ InlineHeap.set(keyHeap, keyHeapSize++, remap(0,
streams[stream].keys), stream);
+ }
+
+ int keyIndex = 0;
+ keyHeapSize = InlineHeap.heapify(keyHeap, keyHeapSize);
+ while (keyHeapSize > 0)
+ {
+ // while the heap is non-empty, pop the streams matching the top
key and insert them into their own heap
+
+ // if we have keys with no contents, fill in zeroes
+ while (keyIndex < InlineHeap.key(keyHeap, 0))
+ {
+ if (keyIndex == result.length)
+ result = Arrays.copyOf(result, result.length * 2);
+
+ result[keyIndex++] = resultIndex;
+ }
+
+ int txnIdHeapSize = InlineHeap.consume(keyHeap, keyHeapSize, (key,
streamIndex, size) -> {
+ MergeStream stream = streams[streamIndex];
+ InlineHeap.set(txnIdHeap, size,
remap(stream.input[stream.index], stream.remap), streamIndex);
+ return size + 1;
+ }, 0);
+
+ if (txnIdHeapSize > 1)
+ {
+ txnIdHeapSize = InlineHeap.heapify(txnIdHeap, txnIdHeapSize);
+ do
+ {
+ if (resultIndex + txnIdHeapSize >= result.length)
+ result = Arrays.copyOf(result, Math.max((resultIndex +
txnIdHeapSize) * 2, resultIndex + (resultIndex/2)));
+
+ result[resultIndex++] = InlineHeap.key(txnIdHeap, 0);
+ InlineHeap.consume(txnIdHeap, txnIdHeapSize, (key, stream,
v) -> 0, 0);
+
+ txnIdHeapSize = InlineHeap.advance(txnIdHeap,
txnIdHeapSize, streamIndex -> {
+ MergeStream stream = streams[streamIndex];
+ int index = ++stream.index;
+ if (index == stream.endIndex)
+ return Integer.MIN_VALUE;
+ return remap(stream.input[index], stream.remap);
+ });
+ }
+ while (txnIdHeapSize > 1);
+ }
+
+ // fast path when one remaining source for this key
+ if (txnIdHeapSize > 0)
+ {
+ int streamIndex = InlineHeap.stream(txnIdHeap, 0);
+ MergeStream stream = streams[streamIndex];
+ int index = stream.index;
+ int endIndex = stream.endIndex;
+ int count = endIndex - index;
+ if (result.length < resultIndex + count)
+ result = Arrays.copyOf(result, Math.max(result.length +
(result.length / 2), resultIndex + count));
+
+ while (index < endIndex)
+ result[resultIndex++] = remap(stream.input[index++],
stream.remap);
+
+ stream.index = index;
+ stream.endIndex = stream.input[++stream.keyIndex];
+ }
+
+ result[keyIndex++] = resultIndex;
+ keyHeapSize = InlineHeap.advance(keyHeap, keyHeapSize, streamIndex
-> {
+ MergeStream stream = streams[streamIndex];
+ while (stream.index == stream.endIndex && stream.keyIndex <
stream.keyCount)
+ stream.endIndex = stream.input[++stream.keyIndex];
+ return stream.keyIndex == stream.keyCount
+ ? Integer.MIN_VALUE
+ : remap(stream.keyIndex, stream.keys);
+ });
+ }
+
+ while (keyIndex < keys.size())
+ result[keyIndex++] = resultIndex;
+
+ if (resultIndex < result.length)
+ result = Arrays.copyOf(result, resultIndex);
+
+ return new Deps(keys, txnIds, result);
+ }
+
+ final Keys keys; // unique Keys
+ final TxnId[] txnIds; // unique TxnId TODO: this should perhaps be a BTree?
+
+ // first N entries are offsets for each src item, remainder are pointers
into value set (either keys or txnIds)
+ final int[] keyToTxnId; // Key -> [TxnId]
+ int[] txnIdToKey; // TxnId -> [Key]
+
+ Deps(Keys keys, TxnId[] txnIds, int[] keyToTxnId)
+ {
+ this.keys = keys;
+ this.txnIds = txnIds;
+ this.keyToTxnId = keyToTxnId;
+ Preconditions.checkState(keys.isEmpty() || keyToTxnId[keys.size() - 1]
== keyToTxnId.length);
+ if (!checkValid())
+ throw new AssertionError();
+ }
+
+ public Deps slice(KeyRanges ranges)
+ {
+ if (isEmpty())
+ return this;
+
+ Keys select = keys.slice(ranges);
+ if (select.isEmpty())
+ return NONE;
+
+ if (select.size() == keys.size())
+ return this;
+
+ int i = 0;
+ int offset = select.size();
+ for (int j = 0 ; j < select.size() ; ++j)
+ {
+ i = keys.findNext(select.get(j), i);
+ offset += keyToTxnId[i] - (i == 0 ? keys.size() : keyToTxnId[i -
1]);
+ }
+
+ int[] src = keyToTxnId;
+ int[] trg = new int[offset];
+
+ i = 0;
+ offset = select.size();
+ for (int j = 0 ; j < select.size() ; ++j)
+ {
+ i = keys.findNext(select.get(j), i);
+ int start = i == 0 ? keys.size() : src[i - 1];
+ int count = src[i] - start;
+ System.arraycopy(src, start, trg, offset, count);
+ offset += count;
+ trg[j] = offset;
+ }
+
+ TxnId[] txnIds = trimUnusedTxnId(select, this.txnIds, trg);
+ return new Deps(select, txnIds, trg);
+ }
+
+ private static TxnId[] trimUnusedTxnId(Keys keys, TxnId[] txnIds, int[]
keysToTxnId)
+ {
+ int[] remapTxnId = new int[txnIds.length];
+ for (int i = keys.size() ; i < keysToTxnId.length ; ++i)
+ remapTxnId[keysToTxnId[i]] = 1;
+
+ int offset = 0;
+ for (int i = 0 ; i < remapTxnId.length ; ++i)
+ {
+ if (remapTxnId[i] == 1) remapTxnId[i] = offset++;
+ else remapTxnId[i] = -1;
+ }
+
+ TxnId[] result = txnIds;
+ if (offset < remapTxnId.length)
+ {
+ result = new TxnId[offset];
+ for (int i = 0 ; i < txnIds.length ; ++i)
+ {
+ if (remapTxnId[i]>= 0)
+ result[remapTxnId[i]] = txnIds[i];
+ }
+ for (int i = keys.size() ; i < keysToTxnId.length ; ++i)
+ keysToTxnId[i] = remapTxnId[keysToTxnId[i]];
+ }
+
+ return result;
+ }
+
+ public Deps with(Deps that)
+ {
+ if (isEmpty() || that.isEmpty())
+ return isEmpty() ? that : this;
+
+ Keys keys = this.keys.union(that.keys);
+ TxnId[] txnIds = SortedArrays.linearUnion(this.txnIds, that.txnIds,
TxnId[]::new);
+ int[] remapLeft = remapper(this.txnIds, txnIds, true);
+ int[] remapRight = remapper(that.txnIds, txnIds, true);
+ Keys leftKeys = this.keys, rightKeys = that.keys;
+ int[] left = keyToTxnId, right = that.keyToTxnId;
+
+ if (remapLeft == null && remapRight == null && Arrays.equals(left,
right)
+ && keys.size() == leftKeys.size() && keys.size() ==
rightKeys.size())
+ {
+ return this;
+ }
+
+ int[] out = null;
+ int lk = 0, rk = 0, ok = 0, l = this.keys.size(), r =
that.keys.size(), o = keys.size();
+
+ if (remapLeft == null && keys == leftKeys)
+ {
+ noOp: while (lk < leftKeys.size() && rk < rightKeys.size())
+ {
+ int ck = leftKeys.get(lk).compareTo(rightKeys.get(rk));
+ if (ck < 0)
+ {
+ o += left[lk] - l;
+ l = left[lk];
+ assert o == l && ok == lk && left[ok] == o;
+ ok++;
+ lk++;
+ }
+ else if (ck > 0)
+ {
+ throw new IllegalStateException();
+ }
+ else
+ {
+ while (l < left[lk] && r < right[rk])
+ {
+ int nextLeft = left[l];
+ int nextRight = remap(right[r], remapRight);
+
+ if (nextLeft < nextRight)
+ {
+ o++;
+ l++;
+ }
+ else if (nextRight < nextLeft)
+ {
+ out = copy(left, o, left.length + right.length -
r);
+ break noOp;
+ }
+ else
+ {
+ o++;
+ l++;
+ r++;
+ }
+ }
+
+ if (l < left[lk])
+ {
+ o += left[lk] - l;
+ l = left[lk];
+ }
+ else if (r < right[rk])
+ {
+ out = copy(left, o, left.length + right.length - r);
+ break;
+ }
+
+ assert o == l && ok == lk && left[ok] == o;
+ ok++;
+ rk++;
+ lk++;
+ }
+ }
+
+ if (out == null)
+ return this;
+ }
+ else if (remapRight == null && keys == rightKeys)
+ {
+ noOp: while (lk < leftKeys.size() && rk < rightKeys.size())
+ {
+ int ck = leftKeys.get(lk).compareTo(rightKeys.get(rk));
+ if (ck < 0)
+ {
+ throw new IllegalStateException();
+ }
+ else if (ck > 0)
+ {
+ o += right[rk] - r;
+ r = right[rk];
+ assert o == r && ok == rk && right[ok] == o;
+ ok++;
+ rk++;
+ }
+ else
+ {
+ while (l < left[lk] && r < right[rk])
+ {
+ int nextLeft = remap(left[l], remapLeft);
+ int nextRight = right[r];
+
+ if (nextLeft < nextRight)
+ {
+ out = copy(right, o, right.length + left.length -
l);
+ break noOp;
+ }
+ else if (nextRight < nextLeft)
+ {
+ o++;
+ r++;
+ }
+ else
+ {
+ o++;
+ l++;
+ r++;
+ }
+ }
+
+ if (l < left[lk])
+ {
+ out = copy(right, o, right.length + left.length - l);
+ break;
+ }
+ else if (r < right[rk])
+ {
+ o += right[rk] - r;
+ r = right[rk];
+ }
+
+ assert o == r && ok == rk && right[ok] == o;
+ ok++;
+ rk++;
+ lk++;
+ }
+ }
+
+ if (out == null)
+ return that;
+ }
+ else
+ {
+ out = new int[left.length + right.length];
+ }
+
+ while (lk < leftKeys.size() && rk < rightKeys.size())
+ {
+ int ck = leftKeys.get(lk).compareTo(rightKeys.get(rk));
+ if (ck < 0)
+ {
+ while (l < left[lk])
+ out[o++] = remap(left[l++], remapLeft);
+ out[ok++] = o;
+ lk++;
+ }
+ else if (ck > 0)
+ {
+ while (r < right[rk])
+ out[o++] = remap(right[r++], remapRight);
+ out[ok++] = o;
+ rk++;
+ }
+ else
+ {
+ while (l < left[lk] && r < right[rk])
+ {
+ int nextLeft = remap(left[l], remapLeft);
+ int nextRight = remap(right[r], remapRight);
+
+ if (nextLeft <= nextRight)
+ {
+ out[o++] = nextLeft;
+ l += 1;
+ r += nextLeft == nextRight ? 1 : 0;
+ }
+ else
+ {
+ out[o++] = nextRight;
+ ++r;
+ }
+ }
+
+ while (l < left[lk])
+ out[o++] = remap(left[l++], remapLeft);
+
+ while (r < right[rk])
+ out[o++] = remap(right[r++], remapRight);
+
+ out[ok++] = o;
+ rk++;
+ lk++;
+ }
+ }
+
+ while (lk < leftKeys.size())
+ {
+ while (l < left[lk])
+ out[o++] = remap(left[l++], remapLeft);
+ out[ok++] = o;
+ lk++;
+ }
+
+ while (rk < rightKeys.size())
+ {
+ while (r < right[rk])
+ out[o++] = remap(right[r++], remapRight);
+ out[ok++] = o;
+ rk++;
+ }
+
+ if (o < out.length)
+ out = Arrays.copyOf(out, o);
+
+ return new Deps(keys, txnIds, out);
+ }
+
+ private static int[] copy(int[] src, int to, int length)
+ {
+ if (length == 0)
+ return NO_INTS;
+
+ int[] result = new int[length];
+ System.arraycopy(src, 0, result, 0, to);
+ return result;
+ }
+
+ // TODO: optimise for case where none removed
+ public Deps without(Predicate<TxnId> remove)
+ {
+ if (isEmpty())
+ return this;
+
+ int[] remapTxnIds = new int[txnIds.length];
+ TxnId[] txnIds; {
+ int count = 0;
+ for (int i = 0 ; i < this.txnIds.length ; ++i)
+ {
+ if (remove.test(this.txnIds[i])) remapTxnIds[i] = -1;
+ else remapTxnIds[i] = count++;
+ }
+
+ if (count == remapTxnIds.length)
+ return this;
+
+ if (count == 0)
+ return NONE;
+
+ txnIds = new TxnId[count];
+ for (int i = 0 ; i < this.txnIds.length ; ++i)
+ {
+ if (remapTxnIds[i] >= 0)
+ txnIds[remapTxnIds[i]] = this.txnIds[i];
+ }
+ }
+
+ int[] keyToTxnId = new int[this.keyToTxnId.length];
+ int k = 0, i = keys.size(), o = i;
+ while (i < this.keyToTxnId.length)
+ {
+ while (this.keyToTxnId[k] == i)
+ keyToTxnId[k++] = o;
+
+ int remapped = remapTxnIds[this.keyToTxnId[i]];
+ if (remapped > 0)
+ keyToTxnId[o++] = remapped;
+ ++i;
+ }
+
+ while (k < keys.size())
+ keyToTxnId[k++] = o;
+
+ keyToTxnId = Arrays.copyOf(keyToTxnId, o);
+
+ return new Deps(keys, txnIds, keyToTxnId);
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return Arrays.binarySearch(txnIds, txnId) >= 0;
+ }
+
+ // return true iff we map any keys to any txnId
+ // if the mapping is empty we return false, whether or not we have any
keys or txnId by themselves
+ public boolean isEmpty()
+ {
+ return keyToTxnId.length == keys.size();
+ }
+
+ public Keys someKeys(TxnId txnId)
+ {
+ int txnIdIndex = Arrays.binarySearch(txnIds, txnId);
+ if (txnIdIndex < 0)
+ return Keys.EMPTY;
+
+ ensureTxnIdToKey();
+
+ int start = txnIdIndex == 0 ? txnIds.length : txnIdToKey[txnIdIndex -
1];
+ int end = txnIdToKey[txnIdIndex];
+ if (start == end)
+ return Keys.EMPTY;
+
+ Key[] result = new Key[end - start];
+ for (int i = start ; i < end ; ++i)
+ result[i - start] = keys.get(txnIdToKey[i]);
+ return new Keys(result);
+ }
+
+ private void ensureTxnIdToKey()
+ {
+ if (txnIdToKey != null)
+ return;
+
+ int[] src = keyToTxnId;
+ int[] trg = txnIdToKey = new int[txnIds.length - keys.size() +
keyToTxnId.length];
+
+ // first pass, count number of txnId per key
+ for (int i = keys.size() ; i < src.length ; ++i)
+ trg[src[i]]++;
+
+ // turn into offsets (i.e. add txnIds.size() and then sum them)
+ trg[0] += txnIds.length;
+ for (int i = 1; i < txnIds.length ; ++i)
+ trg[i] += trg[i - 1];
+
+ // shuffle forwards one, so we have the start index rather than end
+ System.arraycopy(trg, 0, trg, 1, txnIds.length - 1);
+ trg[0] = txnIds.length;
+
+ int k = 0;
+ for (int i = keys.size() ; i < src.length ; ++i)
+ {
+ while (i == keyToTxnId[k])
+ ++k;
+
+ trg[trg[src[i]]++] = k;
+ }
+ }
+
+ public void forEachOn(KeyRanges ranges, Predicate<Key> include,
BiConsumer<Key, TxnId> forEach)
+ {
+ keys.foldl(ranges, (index, key, value) -> {
+ if (!include.test(key))
+ return null;
+
+ for (int t = index == 0 ? keys.size() : keyToTxnId[index - 1], end
= keyToTxnId[index]; t < end ; ++t)
+ {
+ TxnId txnId = txnIds[keyToTxnId[t]];
+ forEach.accept(key, txnId);
+ }
+ return null;
+ }, null);
+ }
+
+ public void forEachOn(CommandStore commandStore, Timestamp executeAt,
Consumer<TxnId> forEach)
Review Comment:
This is used in one place, so kinda feel it would be best to drop this
public method and just have the one caller do this logic, this also means
`Deps` doesn't depend on `CommandStore`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]