dcapwell commented on code in PR #2339:
URL: https://github.com/apache/cassandra/pull/2339#discussion_r1195685962


##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -248,10 +407,113 @@ public void completeOperation(AccordSafeCommandStore 
store,
                                   Map<RoutableKey, AccordSafeCommandsForKey> 
commandsForKeys)
     {
         Invariants.checkState(current == store);
+        maybeUpdateRangeIndex(commands);
         current.complete();
         current = null;
     }
 
+
+    private static CommandTimeseriesHolder fromRangeSummary(Seekable 
keyOrRange, List<RangeCommandSummary> matches)
+    {
+        return new CommandTimeseriesHolder()
+        {
+            @Override
+            public CommandTimeseries<?> byId()
+            {
+                CommandTimeseries.Update<RangeCommandSummary> builder = new 
CommandTimeseries.Update<>(keyOrRange, RangeCommandSummaryLoader.INSTANCE);
+                for (RangeCommandSummary m : matches)
+                {
+                    if (m.status == SaveStatus.Invalidated)
+                        continue;
+                    builder.add(m.txnId, m);
+                }
+                return builder.build();
+            }
+
+            @Override
+            public CommandTimeseries<?> byExecuteAt()
+            {
+                CommandTimeseries.Update<RangeCommandSummary> builder = new 
CommandTimeseries.Update<>(null, RangeCommandSummaryLoader.INSTANCE);
+                for (RangeCommandSummary m : matches)
+                {
+                    if (m.status == SaveStatus.Invalidated)
+                        continue;
+                    builder.add(m.executeAt != null ? m.executeAt : m.txnId, 
m);
+                }
+                return builder.build();
+            }
+        };
+    }
+
+    <O> O mapReduceForRange(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+    {
+        switch (keysOrRanges.domain())
+        {
+            case Key:
+            {
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                for (Key key : keys)
+                {
+                    if (!slice.contains(key)) continue;
+
+                    accumulate = map.apply(fromRangeSummary(key, 
rangesToCommands.search(key)), accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+            }
+            break;
+            case Range:
+            {
+                AbstractRanges<?> ranges = (AbstractRanges<?>) keysOrRanges;
+                ranges = ranges.slice(slice, Routables.Slice.Minimal);
+                for (Range range : ranges)
+                {
+                    accumulate = map.apply(fromRangeSummary(range, 
rangesToCommands.search(Interval.create(range.start(), range.end()))), 
accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+            }
+            break;
+            default:
+                throw new AssertionError("Unknown domain: " + 
keysOrRanges.domain());
+        }
+        return accumulate;
+    }
+
+    private void maybeUpdateRangeIndex(Map<TxnId, AccordSafeCommand> commands)
+    {
+        for (Map.Entry<TxnId, AccordSafeCommand> e : commands.entrySet())
+        {
+            TxnId txnId = e.getKey();
+            if (txnId.domain() != Routable.Domain.Range)
+                continue;
+            Command current = e.getValue().current();
+            if (current.saveStatus() == SaveStatus.NotWitnessed)
+                continue; // don't know the range/dependencies, so can't cache
+            PartialTxn txn = current.partialTxn();
+            Seekables<?, ?> keys = txn.keys();
+            if (keys.domain() != Routable.Domain.Range)
+                throw new AssertionError("Found a Range Transaction that had 
non-Range keys: " + current);
+            if (keys.isEmpty())
+                throw new AssertionError("Found a Range Transaction that has 
empty keys: " + current);
+            PartialDeps deps = current.partialDeps();
+            List<TxnId> dependsOn = deps == null ? Collections.emptyList() : 
deps.txnIds();
+
+            RangeCommandSummary summary = new RangeCommandSummary(txnId, 
current.saveStatus(), current.executeAt(), dependsOn);
+            Ranges ranges = (Ranges) keys;
+            put(ranges, summary);
+        }
+    }
+
+    private void put(Ranges ranges, RangeCommandSummary summary)
+    {
+        IntervalTree.Builder<RoutableKey, RangeCommandSummary, 
Interval<RoutableKey, RangeCommandSummary>> builder = 
rangesToCommands.unbuild();
+        //TODO double check this tree has same inclusive/exclusive semantics 
as this range...
+        for (Range range : ranges)
+            builder.add(new Interval<>(range.start(), range.end(), summary));

Review Comment:
   I believe this is not correct as this method gets called on state change, 
and `IntervalTree` will just add the update (so we see new and old); will look 
how to filter this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to