belliottsmith commented on code in PR #6:
URL: https://github.com/apache/cassandra-accord/pull/6#discussion_r941093448
##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -0,0 +1,978 @@
+package accord.primitives;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.utils.InlineHeap;
+import accord.utils.SortedArrays;
+
+import static accord.utils.SortedArrays.remap;
+import static accord.utils.SortedArrays.remapper;
+
+// TODO (now): switch to RoutingKey
+public class Deps implements Iterable<Map.Entry<Key, TxnId>>
+{
+ private static final TxnId[] NO_TXNIDS = new TxnId[0];
+ private static final int[] NO_INTS = new int[0];
+ public static final Deps NONE = new Deps(Keys.EMPTY, NO_TXNIDS, NO_INTS);
+
+ public static class Builder
+ {
+ final Keys keys;
+ final Map<TxnId, Integer> txnIdLookup = new HashMap<>(); // TODO:
primitive map
+ TxnId[] txnIds = new TxnId[4];
+ final int[][] keysToTxnId;
+ final int[] keysToTxnIdCounts;
+
+ public Builder(Keys keys)
+ {
+ this.keys = keys;
+ this.keysToTxnId = new int[keys.size()][4];
+ this.keysToTxnIdCounts = new int[keys.size()];
+ }
+
+ public boolean isEmpty()
+ {
+ return Arrays.stream(keysToTxnIdCounts).allMatch(i -> i == 0);
+ }
+
+ public void add(Command command)
+ {
+ int idx = ensureTxnIdx(command.txnId());
+ keys.foldlIntersect(command.txn().keys, (li, ri, k, p, v) -> {
+ if (keysToTxnId[li].length == keysToTxnIdCounts[li])
+ keysToTxnId[li] = Arrays.copyOf(keysToTxnId[li],
keysToTxnId[li].length * 2);
+ keysToTxnId[li][keysToTxnIdCounts[li]++] = idx;
+ return 0;
+ }, 0, 0, 1);
+ }
+
+ public void add(Key key, TxnId txnId)
+ {
+ int txnIdx = ensureTxnIdx(txnId);
+ int keyIdx = keys.indexOf(key);
+ if (keysToTxnIdCounts[keyIdx] == keysToTxnId[keyIdx].length)
+ keysToTxnId[keyIdx] = Arrays.copyOf(keysToTxnId[keyIdx],
Math.max(4, keysToTxnIdCounts[keyIdx] * 2));
+ keysToTxnId[keyIdx][keysToTxnIdCounts[keyIdx]++] = txnIdx;
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return txnIdx(txnId) >= 0;
+ }
+
+ private int txnIdx(TxnId txnId)
+ {
+ return txnIdLookup.getOrDefault(txnId, -1);
+ }
+
+ private int ensureTxnIdx(TxnId txnId)
+ {
+ return txnIdLookup.computeIfAbsent(txnId, ignore -> {
+ if (txnIds.length == txnIdLookup.size())
+ txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
+ return txnIdLookup.size();
+ });
+ }
+
+ public Deps build()
+ {
+ TxnId[] txnIds = txnIdLookup.keySet().toArray(TxnId[]::new);
+ Arrays.sort(txnIds, TxnId::compareTo);
+ int[] txnIdMap = new int[txnIds.length];
+ for (int i = 0 ; i < txnIdMap.length ; i++)
+ txnIdMap[txnIdLookup.get(txnIds[i])] = i;
+
+ int keyCount = 0;
+ int[] result; {
+ int count = 0;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ keyCount += keysToTxnIdCounts[i] > 0 ? 1 : 0;
+ count += keysToTxnIdCounts[i];
+ }
+ result = new int[count + keyCount];
+ }
+
+ int keyIndex = 0;
+ int offset = keyCount;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ {
+ int count = keysToTxnIdCounts[i];
+ int[] src = keysToTxnId[i];
+ for (int j = 0 ; j < count ; ++j)
+ result[j + offset] = txnIdMap[src[j]];
+ Arrays.sort(result, offset, count + offset);
+ int dups = 0;
+ for (int j = offset + 1 ; j < offset + count ; ++j)
+ {
+ if (result[j] == result[j - 1]) ++dups;
+ else if (dups > 0) result[j - dups] = result[j];
+ }
+ result[keyIndex] = offset += count - dups;
+ ++keyIndex;
+ }
+ }
+ if (offset < result.length)
+ result = Arrays.copyOf(result, offset);
+
+ Keys keys = this.keys;
+ if (keyCount < keys.size())
+ {
+ keyIndex = 0;
+ Key[] newKeys = new Key[keyCount];
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ newKeys[keyIndex++] = keys.get(i);
+ }
+ keys = new Keys(newKeys);
+ }
+
+ return new Deps(keys, txnIds, result);
+ }
+ }
+
+ public static Builder builder(Keys keys)
+ {
+ return new Builder(keys);
+ }
+
+ static class MergeStream
+ {
+ final Deps source;
+ // TODO: could share backing array for all of these if we want, with
an additional offset
+ final int[] input;
+ final int keyCount;
+ int[] remap; // TODO: use cached backing array
+ int[] keys; // TODO: use cached backing array
+ int keyIndex;
+ int index;
+ int endIndex;
+
+ MergeStream(Deps source)
+ {
+ this.source = source;
+ this.input = source.keyToTxnId;
+ this.keyCount = source.keys.size();
+ }
+
+ private void init(Keys keys, TxnId[] txnIds)
+ {
+ this.remap = remapper(source.txnIds, txnIds, true);
+ this.keys = source.keys.remapper(keys, true);
+ while (input[keyIndex] == keyCount)
+ ++keyIndex;
+ this.index = keyCount;
+ this.endIndex = input[keyIndex];
+ }
+ }
+
+ public static <T> Deps merge(Keys keys, List<T> merge, Function<T, Deps>
getter)
Review Comment:
merge currently has a handful of trivially cacheable large allocations, so
it will be much faster and lower allocation when those simple TODO are
completed. It was more important to me to do the difficult work first, but
you’re welcome to tackle the TODOs as part of review to make it faster if you
like! (Probably easier once we have imported c* utils)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]