belliottsmith commented on code in PR #6:
URL: https://github.com/apache/cassandra-accord/pull/6#discussion_r942458676
##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -0,0 +1,978 @@
+package accord.primitives;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.utils.InlineHeap;
+import accord.utils.SortedArrays;
+
+import static accord.utils.SortedArrays.remap;
+import static accord.utils.SortedArrays.remapper;
+
+// TODO (now): switch to RoutingKey
+public class Deps implements Iterable<Map.Entry<Key, TxnId>>
+{
+ private static final TxnId[] NO_TXNIDS = new TxnId[0];
+ private static final int[] NO_INTS = new int[0];
+ public static final Deps NONE = new Deps(Keys.EMPTY, NO_TXNIDS, NO_INTS);
+
+ public static class Builder
+ {
+ final Keys keys;
+ final Map<TxnId, Integer> txnIdLookup = new HashMap<>(); // TODO:
primitive map
+ TxnId[] txnIds = new TxnId[4];
+ final int[][] keysToTxnId;
+ final int[] keysToTxnIdCounts;
+
+ public Builder(Keys keys)
+ {
+ this.keys = keys;
+ this.keysToTxnId = new int[keys.size()][4];
+ this.keysToTxnIdCounts = new int[keys.size()];
+ }
+
+ public boolean isEmpty()
+ {
+ return Arrays.stream(keysToTxnIdCounts).allMatch(i -> i == 0);
+ }
+
+ public void add(Command command)
+ {
+ int idx = ensureTxnIdx(command.txnId());
+ keys.foldlIntersect(command.txn().keys, (li, ri, k, p, v) -> {
+ if (keysToTxnId[li].length == keysToTxnIdCounts[li])
+ keysToTxnId[li] = Arrays.copyOf(keysToTxnId[li],
keysToTxnId[li].length * 2);
+ keysToTxnId[li][keysToTxnIdCounts[li]++] = idx;
+ return 0;
+ }, 0, 0, 1);
+ }
+
+ public void add(Key key, TxnId txnId)
+ {
+ int txnIdx = ensureTxnIdx(txnId);
+ int keyIdx = keys.indexOf(key);
+ if (keysToTxnIdCounts[keyIdx] == keysToTxnId[keyIdx].length)
+ keysToTxnId[keyIdx] = Arrays.copyOf(keysToTxnId[keyIdx],
Math.max(4, keysToTxnIdCounts[keyIdx] * 2));
+ keysToTxnId[keyIdx][keysToTxnIdCounts[keyIdx]++] = txnIdx;
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return txnIdx(txnId) >= 0;
+ }
+
+ private int txnIdx(TxnId txnId)
+ {
+ return txnIdLookup.getOrDefault(txnId, -1);
+ }
+
+ private int ensureTxnIdx(TxnId txnId)
+ {
+ return txnIdLookup.computeIfAbsent(txnId, ignore -> {
+ if (txnIds.length == txnIdLookup.size())
+ txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
+ return txnIdLookup.size();
+ });
+ }
+
+ public Deps build()
+ {
+ TxnId[] txnIds = txnIdLookup.keySet().toArray(TxnId[]::new);
+ Arrays.sort(txnIds, TxnId::compareTo);
+ int[] txnIdMap = new int[txnIds.length];
+ for (int i = 0 ; i < txnIdMap.length ; i++)
+ txnIdMap[txnIdLookup.get(txnIds[i])] = i;
+
+ int keyCount = 0;
+ int[] result; {
+ int count = 0;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ keyCount += keysToTxnIdCounts[i] > 0 ? 1 : 0;
+ count += keysToTxnIdCounts[i];
+ }
+ result = new int[count + keyCount];
+ }
+
+ int keyIndex = 0;
+ int offset = keyCount;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ {
+ int count = keysToTxnIdCounts[i];
+ int[] src = keysToTxnId[i];
+ for (int j = 0 ; j < count ; ++j)
+ result[j + offset] = txnIdMap[src[j]];
+ Arrays.sort(result, offset, count + offset);
+ int dups = 0;
+ for (int j = offset + 1 ; j < offset + count ; ++j)
+ {
+ if (result[j] == result[j - 1]) ++dups;
+ else if (dups > 0) result[j - dups] = result[j];
+ }
+ result[keyIndex] = offset += count - dups;
+ ++keyIndex;
+ }
+ }
+ if (offset < result.length)
+ result = Arrays.copyOf(result, offset);
+
+ Keys keys = this.keys;
+ if (keyCount < keys.size())
+ {
+ keyIndex = 0;
+ Key[] newKeys = new Key[keyCount];
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ newKeys[keyIndex++] = keys.get(i);
+ }
+ keys = new Keys(newKeys);
+ }
+
+ return new Deps(keys, txnIds, result);
+ }
+ }
+
+ public static Builder builder(Keys keys)
+ {
+ return new Builder(keys);
+ }
+
+ static class MergeStream
+ {
+ final Deps source;
+ // TODO: could share backing array for all of these if we want, with
an additional offset
+ final int[] input;
+ final int keyCount;
+ int[] remap; // TODO: use cached backing array
+ int[] keys; // TODO: use cached backing array
+ int keyIndex;
+ int index;
+ int endIndex;
+
+ MergeStream(Deps source)
+ {
+ this.source = source;
+ this.input = source.keyToTxnId;
+ this.keyCount = source.keys.size();
+ }
+
+ private void init(Keys keys, TxnId[] txnIds)
+ {
+ this.remap = remapper(source.txnIds, txnIds, true);
+ this.keys = source.keys.remapper(keys, true);
+ while (input[keyIndex] == keyCount)
+ ++keyIndex;
+ this.index = keyCount;
+ this.endIndex = input[keyIndex];
+ }
+ }
+
+ public static <T> Deps merge(Keys keys, List<T> merge, Function<T, Deps>
getter)
Review Comment:
For comparison, I included running `with` in a loop, and to my surprise this
is often faster. It allocates a lot more garbage, but this could readily be
optimised by separating `with` into a function that operates on provided in/out
arrays, and re-using pairs of buffers.
Given `merge` has lower complexity than `with` it suggests we're leaving
some constant factors on the table. Since it's faster even with only 4 streams
to merge, it's not as simple as cache associativity, however optimally we might
anyway prefer to buffer the input streams in a cache-sympathetic manner. It's
possible that a sequence of linear merges has better pipelining /
predictability.
It's worth noting that in the real world, we expect to mostly merge either
disjoint keys or keys with `txnId` lists that are mostly the same. So, we
should run some tests specifically with this in mind to verify approaches. We
don't anticipate lots of keys, but probably 9-16 copies for each key, and
dozens to hundreds of dependencies per transaction.
Either way, perhaps it would be fine to simplify `merge` into a variant of
recursive `with` that produces less garbage, and later perhaps introduce a
variant that balances algorithmic complexity (which is very poor with this
approach) and constant factors.
```
13:47:29.482 [main] INFO accord.txn.DepsTest - Benchmark[name=builder
forEach(No.=4, keys=21, size=1646)] avg 168.35ms, 0.22ms GC, allocated 6901 MiB
13:47:31.980 [main] INFO accord.txn.DepsTest - Benchmark[name=merge(No.=4,
keys=21, size=1646)] avg 59.03ms, 0.24ms GC, allocated 1763 MiB
13:47:33.485 [main] INFO accord.txn.DepsTest - Benchmark[name=with(No.=4,
keys=21, size=1646)] avg 35.4ms, 0.22ms GC, allocated 2308 MiB
13:48:15.296 [main] INFO accord.txn.DepsTest - Benchmark[name=builder
forEach(No.=15, keys=27, size=8449)] avg 951.11ms, 0.23ms GC, allocated 30617
MiB
13:48:48.707 [main] INFO accord.txn.DepsTest - Benchmark[name=merge(No.=15,
keys=27, size=8449)] avg 795.14ms, 0.22ms GC, allocated 13242 MiB
13:49:16.317 [main] INFO accord.txn.DepsTest - Benchmark[name=with(No.=15,
keys=27, size=8449)] avg 656.92ms, 0.23ms GC, allocated 35741 MiB
13:49:44.247 [main] INFO accord.txn.DepsTest - Benchmark[name=builder
forEach(No.=9, keys=22, size=5812)] avg 638.99ms, 0.24ms GC, allocated 22966 MiB
13:50:01.518 [main] INFO accord.txn.DepsTest - Benchmark[name=merge(No.=9,
keys=22, size=5812)] avg 410.77ms, 0.23ms GC, allocated 8037 MiB
13:50:14.952 [main] INFO accord.txn.DepsTest - Benchmark[name=with(No.=9,
keys=22, size=5812)] avg 319.43ms, 0.23ms GC, allocated 16794 MiB
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]