belliottsmith commented on code in PR #6:
URL: https://github.com/apache/cassandra-accord/pull/6#discussion_r942669144
##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -0,0 +1,978 @@
+package accord.primitives;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.utils.InlineHeap;
+import accord.utils.SortedArrays;
+
+import static accord.utils.SortedArrays.remap;
+import static accord.utils.SortedArrays.remapper;
+
+// TODO (now): switch to RoutingKey
+public class Deps implements Iterable<Map.Entry<Key, TxnId>>
+{
+ private static final TxnId[] NO_TXNIDS = new TxnId[0];
+ private static final int[] NO_INTS = new int[0];
+ public static final Deps NONE = new Deps(Keys.EMPTY, NO_TXNIDS, NO_INTS);
+
+ public static class Builder
+ {
+ final Keys keys;
+ final Map<TxnId, Integer> txnIdLookup = new HashMap<>(); // TODO:
primitive map
+ TxnId[] txnIds = new TxnId[4];
+ final int[][] keysToTxnId;
+ final int[] keysToTxnIdCounts;
+
+ public Builder(Keys keys)
+ {
+ this.keys = keys;
+ this.keysToTxnId = new int[keys.size()][4];
+ this.keysToTxnIdCounts = new int[keys.size()];
+ }
+
+ public boolean isEmpty()
+ {
+ return Arrays.stream(keysToTxnIdCounts).allMatch(i -> i == 0);
+ }
+
+ public void add(Command command)
+ {
+ int idx = ensureTxnIdx(command.txnId());
+ keys.foldlIntersect(command.txn().keys, (li, ri, k, p, v) -> {
+ if (keysToTxnId[li].length == keysToTxnIdCounts[li])
+ keysToTxnId[li] = Arrays.copyOf(keysToTxnId[li],
keysToTxnId[li].length * 2);
+ keysToTxnId[li][keysToTxnIdCounts[li]++] = idx;
+ return 0;
+ }, 0, 0, 1);
+ }
+
+ public void add(Key key, TxnId txnId)
+ {
+ int txnIdx = ensureTxnIdx(txnId);
+ int keyIdx = keys.indexOf(key);
+ if (keysToTxnIdCounts[keyIdx] == keysToTxnId[keyIdx].length)
+ keysToTxnId[keyIdx] = Arrays.copyOf(keysToTxnId[keyIdx],
Math.max(4, keysToTxnIdCounts[keyIdx] * 2));
+ keysToTxnId[keyIdx][keysToTxnIdCounts[keyIdx]++] = txnIdx;
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return txnIdx(txnId) >= 0;
+ }
+
+ private int txnIdx(TxnId txnId)
+ {
+ return txnIdLookup.getOrDefault(txnId, -1);
+ }
+
+ private int ensureTxnIdx(TxnId txnId)
+ {
+ return txnIdLookup.computeIfAbsent(txnId, ignore -> {
+ if (txnIds.length == txnIdLookup.size())
+ txnIds = Arrays.copyOf(txnIds, txnIds.length * 2);
+ return txnIdLookup.size();
+ });
+ }
+
+ public Deps build()
+ {
+ TxnId[] txnIds = txnIdLookup.keySet().toArray(TxnId[]::new);
+ Arrays.sort(txnIds, TxnId::compareTo);
+ int[] txnIdMap = new int[txnIds.length];
+ for (int i = 0 ; i < txnIdMap.length ; i++)
+ txnIdMap[txnIdLookup.get(txnIds[i])] = i;
+
+ int keyCount = 0;
+ int[] result; {
+ int count = 0;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ keyCount += keysToTxnIdCounts[i] > 0 ? 1 : 0;
+ count += keysToTxnIdCounts[i];
+ }
+ result = new int[count + keyCount];
+ }
+
+ int keyIndex = 0;
+ int offset = keyCount;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ {
+ int count = keysToTxnIdCounts[i];
+ int[] src = keysToTxnId[i];
+ for (int j = 0 ; j < count ; ++j)
+ result[j + offset] = txnIdMap[src[j]];
+ Arrays.sort(result, offset, count + offset);
+ int dups = 0;
+ for (int j = offset + 1 ; j < offset + count ; ++j)
+ {
+ if (result[j] == result[j - 1]) ++dups;
+ else if (dups > 0) result[j - dups] = result[j];
+ }
+ result[keyIndex] = offset += count - dups;
+ ++keyIndex;
+ }
+ }
+ if (offset < result.length)
+ result = Arrays.copyOf(result, offset);
+
+ Keys keys = this.keys;
+ if (keyCount < keys.size())
+ {
+ keyIndex = 0;
+ Key[] newKeys = new Key[keyCount];
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ if (keysToTxnIdCounts[i] > 0)
+ newKeys[keyIndex++] = keys.get(i);
+ }
+ keys = new Keys(newKeys);
+ }
+
+ return new Deps(keys, txnIds, result);
+ }
+ }
+
+ public static Builder builder(Keys keys)
+ {
+ return new Builder(keys);
+ }
+
+ static class MergeStream
+ {
+ final Deps source;
+ // TODO: could share backing array for all of these if we want, with
an additional offset
+ final int[] input;
+ final int keyCount;
+ int[] remap; // TODO: use cached backing array
+ int[] keys; // TODO: use cached backing array
+ int keyIndex;
+ int index;
+ int endIndex;
+
+ MergeStream(Deps source)
+ {
+ this.source = source;
+ this.input = source.keyToTxnId;
+ this.keyCount = source.keys.size();
+ }
+
+ private void init(Keys keys, TxnId[] txnIds)
+ {
+ this.remap = remapper(source.txnIds, txnIds, true);
+ this.keys = source.keys.remapper(keys, true);
+ while (input[keyIndex] == keyCount)
+ ++keyIndex;
+ this.index = keyCount;
+ this.endIndex = input[keyIndex];
+ }
+ }
+
+ public static <T> Deps merge(Keys keys, List<T> merge, Function<T, Deps>
getter)
Review Comment:
> This makes a lot of sense, if you happen to touch the same key vs you are
the same txn just with slightly different inputs so all keys conflict.
More that a given transaction is _likely_ to arrive at all replicas at
around the same "point" (i.e. in the same order wrt other transactions) - and
with our 1RT optimisation is all but _guaranteed_ to do so (excluding lost
messages to some replicas), each key is likely to have _precisely_ the same
txnIds _most of the time_, and sometimes just vary a little. But each shard
will cover only some (different for each replica) subset of the keys, so we're
going to normally be copying over the txnIds for each key and validating
they're the same.
Now that I say this, it makes potentially more sense to perform the
efficient `with` variation, even if it is algorithmically worse, as most of the
time it will be very efficient. Given that we _aren't_ testing this kind of
input and it still performs adequately, the simplicity is probably well
justified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]