belliottsmith commented on code in PR #3679:
URL: https://github.com/apache/cassandra/pull/3679#discussion_r1843358952
##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -161,32 +162,276 @@ private <P1, T> T mapReduce(@Nonnull Timestamp
testTimestamp, @Nullable TxnId te
}
// TODO (required): ensure we are excluding any ranges that are
now shard-redundant (not sure if this is enforced yet)
+ if (summary.ranges == null)
+ return;
+
for (Range range : summary.ranges)
{
- if (!this.ranges.intersects(range))
- continue;
- collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
+ if (keysOrRanges.intersects(range))
+ collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
}
}));
- for (Map.Entry<Range, List<CommandsForRangesLoader.Summary>> e :
collect.entrySet())
+ for (Map.Entry<Range, List<Summary>> e : collect.entrySet())
{
- for (CommandsForRangesLoader.Summary command : e.getValue())
+ for (Summary command : e.getValue())
accumulate = map.apply(p1, e.getKey(), command.txnId,
command.executeAt, accumulate);
}
return accumulate;
}
- public CommandsForRanges slice(Ranges slice)
+ public static class Summary
+ {
+ public final TxnId txnId;
+ @Nullable public final Timestamp executeAt;
+ @Nullable public final SaveStatus saveStatus;
+ @Nullable public final Ranges ranges;
+
+ // TODO (required): this logic is still broken (was already): needs to
consider exact range matches
+ public final TxnId findAsDep;
+ public final boolean hasAsDep;
+
+ @VisibleForTesting
+ Summary(TxnId txnId, @Nullable Timestamp executeAt, SaveStatus
saveStatus, Ranges ranges, TxnId findAsDep, boolean hasAsDep)
+ {
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.saveStatus = saveStatus;
+ this.ranges = ranges;
+ this.findAsDep = findAsDep;
+ this.hasAsDep = hasAsDep;
+ }
+
+ public Summary slice(Ranges slice)
+ {
+ return new Summary(txnId, executeAt, saveStatus, ranges == null ?
null : ranges.slice(slice, Minimal), findAsDep, hasAsDep);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Summary{" +
+ "txnId=" + txnId +
+ ", executeAt=" + executeAt +
+ ", saveStatus=" + saveStatus +
+ ", ranges=" + ranges +
+ ", findAsDep=" + findAsDep +
+ ", hasAsDep=" + hasAsDep +
+ '}';
+ }
+ }
+
+ public static class Manager implements AccordCache.Listener<TxnId, Command>
+ {
+ private final AccordCommandStore commandStore;
+ private final RoutesSearcher searcher = new RoutesSearcher();
+ private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+ private final ObjectHashSet<TxnId> cachedRangeTxns = new
ObjectHashSet<>();
+
+ public Manager(AccordCommandStore commandStore)
+ {
+ this.commandStore = commandStore;
+ try (AccordCommandStore.ExclusiveCaches caches =
commandStore.lockCaches())
+ {
+ caches.commands().register(this);
+ }
+ }
+
+ @Override
+ public void onAdd(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.add(txnId);
+ }
+
+ @Override
+ public void onEvict(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.remove(txnId);
+ }
Review Comment:
This is a cache listener, so it is only invoked by the cache (which must
itself be invoked while holding its lock). This code hasn't changed in this
patch, just been renamed perhaps. I'm surprised to see + against it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]