dcapwell commented on code in PR #3679:
URL: https://github.com/apache/cassandra/pull/3679#discussion_r1842735120


##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -350,25 +352,6 @@ private <O> O mapReduceForRange(Routables<?> keysOrRanges, 
BiFunction<CommandsSu
         if (commandsForRanges == null)
             return accumulate;
 
-        switch (keysOrRanges.domain())

Review Comment:
   https://github.com/apache/cassandra-accord/pull/137 does 
`Invariants.paranoid(keysOrRanges.contains(key));` in CFK, so I think this is 
to push the check into CFK/CFR right?



##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -48,56 +64,41 @@
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 
-public class CommandsForRanges implements CommandsSummary
+public class CommandsForRanges extends TreeMap<Timestamp, 
CommandsForRanges.Summary> implements CommandsSummary
 {
-    public final Ranges ranges;
-    private final NavigableMap<Timestamp, CommandsForRangesLoader.Summary> map;
-
-    private CommandsForRanges(Ranges ranges, NavigableMap<Timestamp, 
CommandsForRangesLoader.Summary> map)
-    {
-        this.ranges = ranges;
-        this.map = map;
-    }
-
-    public static CommandsForRanges create(Ranges ranges, 
NavigableMap<Timestamp, CommandsForRangesLoader.Summary> map)
-    {
-        return new CommandsForRanges(ranges, map);
-    }
-
-    @VisibleForTesting
-    public int size()
+    public CommandsForRanges(Map<? extends Timestamp, ? extends Summary> m)
     {
-        return map.size();
+        super(m);
     }
 
     @Override
-    public <P1, T> T mapReduceFull(TxnId testTxnId, Txn.Kind.Kinds testKind, 
TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, 
CommandFunction<P1, T, T> map, P1 p1, T accumulate)
+    public <P1, T> T mapReduceFull(Routables<?> keysOrRanges, TxnId testTxnId, 
Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, 
TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate)

Review Comment:
   https://github.com/apache/cassandra-accord/pull/137 does 
`Invariants.paranoid(keysOrRanges.contains(key));`, and this patch removed the 
checks for non-intersecting input... but CFR doesn't add any similar checks?  
Why is that?



##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -48,56 +64,41 @@
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 
-public class CommandsForRanges implements CommandsSummary
+public class CommandsForRanges extends TreeMap<Timestamp, 
CommandsForRanges.Summary> implements CommandsSummary
 {
-    public final Ranges ranges;
-    private final NavigableMap<Timestamp, CommandsForRangesLoader.Summary> map;
-
-    private CommandsForRanges(Ranges ranges, NavigableMap<Timestamp, 
CommandsForRangesLoader.Summary> map)
-    {
-        this.ranges = ranges;
-        this.map = map;
-    }
-
-    public static CommandsForRanges create(Ranges ranges, 
NavigableMap<Timestamp, CommandsForRangesLoader.Summary> map)
-    {
-        return new CommandsForRanges(ranges, map);
-    }
-
-    @VisibleForTesting
-    public int size()
+    public CommandsForRanges(Map<? extends Timestamp, ? extends Summary> m)
     {
-        return map.size();
+        super(m);
     }
 
     @Override
-    public <P1, T> T mapReduceFull(TxnId testTxnId, Txn.Kind.Kinds testKind, 
TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, 
CommandFunction<P1, T, T> map, P1 p1, T accumulate)
+    public <P1, T> T mapReduceFull(Routables<?> keysOrRanges, TxnId testTxnId, 
Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, 
TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate)

Review Comment:
   high level question... `ranges` before were the *known* ranges asked for in 
the `PreLoadContext`, so we are dropping that in favor of ranges the user is 
defining... why is that?  If you ask for `[0, 10)` but actually work with `[20, 
42)` is this not a logical bug?  We can't find those ranges because you didn't 
ask to load them... so isn't the load time more accurate?



##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -161,32 +162,276 @@ private <P1, T> T mapReduce(@Nonnull Timestamp 
testTimestamp, @Nullable TxnId te
             }
 
             // TODO (required): ensure we are excluding any ranges that are 
now shard-redundant (not sure if this is enforced yet)
+            if (summary.ranges == null)

Review Comment:
   moments like this I always regret not documenting why something is 
nullable... I don't see a code path that reaches null, but the field is flagged 
as nullable so this change is the correct behavior...



##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -161,32 +162,276 @@ private <P1, T> T mapReduce(@Nonnull Timestamp 
testTimestamp, @Nullable TxnId te
             }
 
             // TODO (required): ensure we are excluding any ranges that are 
now shard-redundant (not sure if this is enforced yet)
+            if (summary.ranges == null)
+                return;
+
             for (Range range : summary.ranges)
             {
-                if (!this.ranges.intersects(range))
-                    continue;
-                collect.computeIfAbsent(range, ignore -> new 
ArrayList<>()).add(summary);
+                if (keysOrRanges.intersects(range))
+                    collect.computeIfAbsent(range, ignore -> new 
ArrayList<>()).add(summary);
             }
         }));
 
-        for (Map.Entry<Range, List<CommandsForRangesLoader.Summary>> e : 
collect.entrySet())
+        for (Map.Entry<Range, List<Summary>> e : collect.entrySet())
         {
-            for (CommandsForRangesLoader.Summary command : e.getValue())
+            for (Summary command : e.getValue())
                 accumulate = map.apply(p1, e.getKey(), command.txnId, 
command.executeAt, accumulate);
         }
 
         return accumulate;
     }
 
-    public CommandsForRanges slice(Ranges slice)
+    public static class Summary
+    {
+        public final TxnId txnId;
+        @Nullable public final Timestamp executeAt;
+        @Nullable public final SaveStatus saveStatus;
+        @Nullable public final Ranges ranges;
+
+        // TODO (required): this logic is still broken (was already): needs to 
consider exact range matches
+        public final TxnId findAsDep;
+        public final boolean hasAsDep;
+
+        @VisibleForTesting
+        Summary(TxnId txnId, @Nullable Timestamp executeAt, SaveStatus 
saveStatus, Ranges ranges, TxnId findAsDep, boolean hasAsDep)
+        {
+            this.txnId = txnId;
+            this.executeAt = executeAt;
+            this.saveStatus = saveStatus;
+            this.ranges = ranges;
+            this.findAsDep = findAsDep;
+            this.hasAsDep = hasAsDep;
+        }
+
+        public Summary slice(Ranges slice)
+        {
+            return new Summary(txnId, executeAt, saveStatus, ranges == null ? 
null : ranges.slice(slice, Minimal), findAsDep, hasAsDep);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Summary{" +
+                   "txnId=" + txnId +
+                   ", executeAt=" + executeAt +
+                   ", saveStatus=" + saveStatus +
+                   ", ranges=" + ranges +
+                   ", findAsDep=" + findAsDep +
+                   ", hasAsDep=" + hasAsDep +
+                   '}';
+        }
+    }
+
+    public static class Manager implements AccordCache.Listener<TxnId, Command>
+    {
+        private final AccordCommandStore commandStore;
+        private final RoutesSearcher searcher = new RoutesSearcher();
+        private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+        private final ObjectHashSet<TxnId> cachedRangeTxns = new 
ObjectHashSet<>();
+
+        public Manager(AccordCommandStore commandStore)
+        {
+            this.commandStore = commandStore;
+            try (AccordCommandStore.ExclusiveCaches caches = 
commandStore.lockCaches())
+            {
+                caches.commands().register(this);
+            }
+        }
+
+        @Override
+        public void onAdd(AccordCacheEntry<TxnId, Command> state)
+        {
+            TxnId txnId = state.key();
+            if (txnId.is(Routable.Domain.Range))
+                cachedRangeTxns.add(txnId);
+        }
+
+        @Override
+        public void onEvict(AccordCacheEntry<TxnId, Command> state)
+        {
+            TxnId txnId = state.key();
+            if (txnId.is(Routable.Domain.Range))
+                cachedRangeTxns.remove(txnId);
+        }

Review Comment:
   what threads can this be called from?  looks like we are allowing more 
access in different places, is this callback called while holding the 
`commandStore.lockCaches()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to