dcapwell commented on code in PR #2339:
URL: https://github.com/apache/cassandra/pull/2339#discussion_r1195685962
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -248,10 +407,113 @@ public void completeOperation(AccordSafeCommandStore
store,
Map<RoutableKey, AccordSafeCommandsForKey>
commandsForKeys)
{
Invariants.checkState(current == store);
+ maybeUpdateRangeIndex(commands);
current.complete();
current = null;
}
+
+ private static CommandTimeseriesHolder fromRangeSummary(Seekable
keyOrRange, List<RangeCommandSummary> matches)
+ {
+ return new CommandTimeseriesHolder()
+ {
+ @Override
+ public CommandTimeseries<?> byId()
+ {
+ CommandTimeseries.Update<RangeCommandSummary> builder = new
CommandTimeseries.Update<>(keyOrRange, RangeCommandSummaryLoader.INSTANCE);
+ for (RangeCommandSummary m : matches)
+ {
+ if (m.status == SaveStatus.Invalidated)
+ continue;
+ builder.add(m.txnId, m);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public CommandTimeseries<?> byExecuteAt()
+ {
+ CommandTimeseries.Update<RangeCommandSummary> builder = new
CommandTimeseries.Update<>(null, RangeCommandSummaryLoader.INSTANCE);
+ for (RangeCommandSummary m : matches)
+ {
+ if (m.status == SaveStatus.Invalidated)
+ continue;
+ builder.add(m.executeAt != null ? m.executeAt : m.txnId,
m);
+ }
+ return builder.build();
+ }
+ };
+ }
+
+ <O> O mapReduceForRange(Routables<?, ?> keysOrRanges, Ranges slice,
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+ {
+ switch (keysOrRanges.domain())
+ {
+ case Key:
+ {
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
+ for (Key key : keys)
+ {
+ if (!slice.contains(key)) continue;
+
+ accumulate = map.apply(fromRangeSummary(key,
rangesToCommands.search(key)), accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ case Range:
+ {
+ AbstractRanges<?> ranges = (AbstractRanges<?>) keysOrRanges;
+ ranges = ranges.slice(slice, Routables.Slice.Minimal);
+ for (Range range : ranges)
+ {
+ accumulate = map.apply(fromRangeSummary(range,
rangesToCommands.search(Interval.create(range.start(), range.end()))),
accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ default:
+ throw new AssertionError("Unknown domain: " +
keysOrRanges.domain());
+ }
+ return accumulate;
+ }
+
+ private void maybeUpdateRangeIndex(Map<TxnId, AccordSafeCommand> commands)
+ {
+ for (Map.Entry<TxnId, AccordSafeCommand> e : commands.entrySet())
+ {
+ TxnId txnId = e.getKey();
+ if (txnId.domain() != Routable.Domain.Range)
+ continue;
+ Command current = e.getValue().current();
+ if (current.saveStatus() == SaveStatus.NotWitnessed)
+ continue; // don't know the range/dependencies, so can't cache
+ PartialTxn txn = current.partialTxn();
+ Seekables<?, ?> keys = txn.keys();
+ if (keys.domain() != Routable.Domain.Range)
+ throw new AssertionError("Found a Range Transaction that had
non-Range keys: " + current);
+ if (keys.isEmpty())
+ throw new AssertionError("Found a Range Transaction that has
empty keys: " + current);
+ PartialDeps deps = current.partialDeps();
+ List<TxnId> dependsOn = deps == null ? Collections.emptyList() :
deps.txnIds();
+
+ RangeCommandSummary summary = new RangeCommandSummary(txnId,
current.saveStatus(), current.executeAt(), dependsOn);
+ Ranges ranges = (Ranges) keys;
+ put(ranges, summary);
+ }
+ }
+
+ private void put(Ranges ranges, RangeCommandSummary summary)
+ {
+ IntervalTree.Builder<RoutableKey, RangeCommandSummary,
Interval<RoutableKey, RangeCommandSummary>> builder =
rangesToCommands.unbuild();
+ //TODO double check this tree has same inclusive/exclusive semantics
as this range...
+ for (Range range : ranges)
+ builder.add(new Interval<>(range.start(), range.end(), summary));
Review Comment:
I believe this is not correct as this method gets called on state change,
and `IntervalTree` will just add the update (so we see new and old); will look
how to filter this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]