dcapwell commented on code in PR #2339:
URL: https://github.com/apache/cassandra/pull/2339#discussion_r1195687670
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -248,10 +407,113 @@ public void completeOperation(AccordSafeCommandStore
store,
Map<RoutableKey, AccordSafeCommandsForKey>
commandsForKeys)
{
Invariants.checkState(current == store);
+ maybeUpdateRangeIndex(commands);
current.complete();
current = null;
}
+
+ private static CommandTimeseriesHolder fromRangeSummary(Seekable
keyOrRange, List<RangeCommandSummary> matches)
+ {
+ return new CommandTimeseriesHolder()
+ {
+ @Override
+ public CommandTimeseries<?> byId()
+ {
+ CommandTimeseries.Update<RangeCommandSummary> builder = new
CommandTimeseries.Update<>(keyOrRange, RangeCommandSummaryLoader.INSTANCE);
+ for (RangeCommandSummary m : matches)
+ {
+ if (m.status == SaveStatus.Invalidated)
+ continue;
+ builder.add(m.txnId, m);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public CommandTimeseries<?> byExecuteAt()
+ {
+ CommandTimeseries.Update<RangeCommandSummary> builder = new
CommandTimeseries.Update<>(null, RangeCommandSummaryLoader.INSTANCE);
+ for (RangeCommandSummary m : matches)
+ {
+ if (m.status == SaveStatus.Invalidated)
+ continue;
+ builder.add(m.executeAt != null ? m.executeAt : m.txnId,
m);
+ }
+ return builder.build();
+ }
+ };
+ }
+
+ <O> O mapReduceForRange(Routables<?, ?> keysOrRanges, Ranges slice,
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+ {
+ switch (keysOrRanges.domain())
+ {
+ case Key:
+ {
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
+ for (Key key : keys)
+ {
+ if (!slice.contains(key)) continue;
+
+ accumulate = map.apply(fromRangeSummary(key,
rangesToCommands.search(key)), accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ case Range:
+ {
+ AbstractRanges<?> ranges = (AbstractRanges<?>) keysOrRanges;
+ ranges = ranges.slice(slice, Routables.Slice.Minimal);
+ for (Range range : ranges)
+ {
+ accumulate = map.apply(fromRangeSummary(range,
rangesToCommands.search(Interval.create(range.start(), range.end()))),
accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ default:
+ throw new AssertionError("Unknown domain: " +
keysOrRanges.domain());
+ }
+ return accumulate;
+ }
+
+ private void maybeUpdateRangeIndex(Map<TxnId, AccordSafeCommand> commands)
+ {
+ for (Map.Entry<TxnId, AccordSafeCommand> e : commands.entrySet())
+ {
+ TxnId txnId = e.getKey();
+ if (txnId.domain() != Routable.Domain.Range)
+ continue;
+ Command current = e.getValue().current();
+ if (current.saveStatus() == SaveStatus.NotWitnessed)
+ continue; // don't know the range/dependencies, so can't cache
+ PartialTxn txn = current.partialTxn();
+ Seekables<?, ?> keys = txn.keys();
+ if (keys.domain() != Routable.Domain.Range)
+ throw new AssertionError("Found a Range Transaction that had
non-Range keys: " + current);
+ if (keys.isEmpty())
+ throw new AssertionError("Found a Range Transaction that has
empty keys: " + current);
+ PartialDeps deps = current.partialDeps();
+ List<TxnId> dependsOn = deps == null ? Collections.emptyList() :
deps.txnIds();
+
+ RangeCommandSummary summary = new RangeCommandSummary(txnId,
current.saveStatus(), current.executeAt(), dependsOn);
+ Ranges ranges = (Ranges) keys;
+ put(ranges, summary);
+ }
+ }
+
+ private void put(Ranges ranges, RangeCommandSummary summary)
+ {
+ IntervalTree.Builder<RoutableKey, RangeCommandSummary,
Interval<RoutableKey, RangeCommandSummary>> builder =
rangesToCommands.unbuild();
+ //TODO double check this tree has same inclusive/exclusive semantics
as this range...
Review Comment:
this looks inclusive on both sides, and our range tend to be `(, ]`, so not
a perfect match... will look to improve
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]