dcapwell commented on code in PR #3679:
URL: https://github.com/apache/cassandra/pull/3679#discussion_r1844660507
##########
src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java:
##########
@@ -158,14 +160,28 @@ public synchronized NavigableSet<ByteBuffer> search(int
storeId, TableId tableId
return pks;
}
- private TreeMap<Range, Set<DecoratedKey>> search(RangeTree<byte[], Range,
DecoratedKey> tokensToPks, byte[] start, byte[] end)
+ public TreeMap<Range, Set<DecoratedKey>> search(RangeTree<byte[], Range,
DecoratedKey> tokensToPks, byte[] start, byte[] end)
{
TreeMap<Range, Set<DecoratedKey>> matches = new TreeMap<>();
tokensToPks.search(new Range(start, end), e ->
matches.computeIfAbsent(e.getKey(), ignore -> new
HashSet<>()).add(e.getValue()));
return matches;
}
+ public synchronized NavigableSet<ByteBuffer> search(int storeId, TableId
tableId, byte[] key)
+ {
+ var rangesToPks = map.get(new Group(storeId, tableId));
Review Comment:
```suggestion
RangeTree<byte[], Range, DecoratedKey> rangesToPks = map.get(new
Group(storeId, tableId));
```
since there is a desire to ban `var` this should be `RangeTree<byte[],
Range, DecoratedKey>`
##########
src/java/org/apache/cassandra/index/accord/RoutesSearcher.java:
##########
@@ -97,22 +98,80 @@ public Entry next()
}
}
- public void intersects(int store, TokenRange range, TxnId minTxnId,
Timestamp maxTxnId, Consumer<TxnId> forEach)
+ private CloseableIterator<Entry> searchKey(int store, AccordRoutingKey key)
{
- intersects(store, range.start(), range.end(), minTxnId, maxTxnId,
forEach);
+ RowFilter rowFilter = RowFilter.create(false);
+ rowFilter.add(participants, Operator.GTE,
OrderedRouteSerializer.serializeRoutingKey(key));
+ rowFilter.add(participants, Operator.LTE,
OrderedRouteSerializer.serializeRoutingKey(key));
+ rowFilter.add(store_id, Operator.EQ,
Int32Type.instance.decompose(store));
+
+ var cmd = PartitionRangeReadCommand.create(cfs.metadata(),
+ FBUtilities.nowInSeconds(),
+ columnFilter,
+ rowFilter,
+ limits,
+ dataRange);
+ Index.Searcher s = index.searcherFor(cmd);
+ try (var controller = cmd.executionController())
Review Comment:
```suggestion
try (ReadExecutionController controller = cmd.executionController())
```
since we want to ban `var`
##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -160,33 +158,279 @@ private <P1, T> T mapReduce(@Nonnull Timestamp
testTimestamp, @Nullable TxnId te
Invariants.checkState(testTxnId.equals(summary.findAsDep));
}
- // TODO (required): ensure we are excluding any ranges that are
now shard-redundant (not sure if this is enforced yet)
for (Range range : summary.ranges)
{
- if (!this.ranges.intersects(range))
- continue;
- collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
+ if (keysOrRanges.intersects(range))
+ collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
}
}));
- for (Map.Entry<Range, List<CommandsForRangesLoader.Summary>> e :
collect.entrySet())
+ for (Map.Entry<Range, List<Summary>> e : collect.entrySet())
{
- for (CommandsForRangesLoader.Summary command : e.getValue())
+ for (Summary command : e.getValue())
accumulate = map.apply(p1, e.getKey(), command.txnId,
command.executeAt, accumulate);
}
return accumulate;
}
- public CommandsForRanges slice(Ranges slice)
+ public static class Summary
+ {
+ public final @Nonnull TxnId txnId;
+ public final @Nonnull Timestamp executeAt;
+ public final @Nonnull SaveStatus saveStatus;
+ public final @Nonnull Ranges ranges;
+
+ public final TxnId findAsDep;
+ public final boolean hasAsDep;
+
+ @VisibleForTesting
+ Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull
SaveStatus saveStatus, @Nonnull Ranges ranges, TxnId findAsDep, boolean
hasAsDep)
+ {
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.saveStatus = saveStatus;
+ this.ranges = ranges;
+ this.findAsDep = findAsDep;
+ this.hasAsDep = hasAsDep;
+ }
+
+ public Summary slice(Ranges slice)
+ {
+ return new Summary(txnId, executeAt, saveStatus,
ranges.slice(slice, Minimal), findAsDep, hasAsDep);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Summary{" +
+ "txnId=" + txnId +
+ ", executeAt=" + executeAt +
+ ", saveStatus=" + saveStatus +
+ ", ranges=" + ranges +
+ ", findAsDep=" + findAsDep +
+ ", hasAsDep=" + hasAsDep +
+ '}';
+ }
+ }
+
+ public static class Manager implements AccordCache.Listener<TxnId, Command>
+ {
+ private final AccordCommandStore commandStore;
+ private final RoutesSearcher searcher = new RoutesSearcher();
+ private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+ private final ObjectHashSet<TxnId> cachedRangeTxns = new
ObjectHashSet<>();
+
+ public Manager(AccordCommandStore commandStore)
+ {
+ this.commandStore = commandStore;
+ try (AccordCommandStore.ExclusiveCaches caches =
commandStore.lockCaches())
+ {
+ caches.commands().register(this);
+ }
+ }
+
+ @Override
+ public void onAdd(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.add(txnId);
+ }
+
+ @Override
+ public void onEvict(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.remove(txnId);
+ }
+
+ public CommandsForRanges.Loader loader(@Nullable TxnId primaryTxnId,
KeyHistory keyHistory, Unseekables<?> keysOrRanges)
+ {
+ RedundantBefore redundantBefore =
commandStore.unsafeGetRedundantBefore();
+ TxnId minTxnId = redundantBefore.min(keysOrRanges, e ->
e.gcBefore);
+ Timestamp maxTxnId = primaryTxnId == null || keyHistory ==
KeyHistory.RECOVER || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX :
primaryTxnId;
+ TxnId findAsDep = primaryTxnId != null && keyHistory ==
KeyHistory.RECOVER ? primaryTxnId : null;
+ return new CommandsForRanges.Loader(this, keysOrRanges,
redundantBefore, minTxnId, maxTxnId, findAsDep);
+ }
+
+ public void mergeTransitive(TxnId txnId, Ranges ranges, BiFunction<?
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+ {
+ transitive.merge(txnId, ranges, remappingFunction);
+ }
+
+ public void gcBefore(TxnId gcBefore, Ranges ranges)
+ {
+ Iterator<Map.Entry<TxnId, Ranges>> iterator =
transitive.headMap(gcBefore).entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<TxnId, Ranges> e = iterator.next();
+ Ranges newRanges = e.getValue().without(ranges);
+ if (newRanges.isEmpty())
+ iterator.remove();
+ e.setValue(newRanges);
+ }
+ }
+ }
+
+ public static class Loader
{
- Ranges ranges = this.ranges.slice(slice, Minimal);
- NavigableMap<Timestamp, CommandsForRangesLoader.Summary> copy = new
TreeMap<>();
- for (Map.Entry<Timestamp, CommandsForRangesLoader.Summary> e :
map.entrySet())
+ private final Manager manager;
+ final Unseekables<?> searchKeysOrRanges;
+ final RedundantBefore redundantBefore;
+ final TxnId minTxnId;
+ final Timestamp maxTxnId;
+ @Nullable final TxnId findAsDep;
+
+ public Loader(Manager manager, Unseekables<?> searchKeysOrRanges,
RedundantBefore redundantBefore, TxnId minTxnId, Timestamp maxTxnId, @Nullable
TxnId findAsDep)
+ {
+ this.manager = manager;
+ this.searchKeysOrRanges = searchKeysOrRanges;
+ this.redundantBefore = redundantBefore;
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.findAsDep = findAsDep;
+ }
+
+ public void intersects(Consumer<TxnId> forEach)
{
- if (!e.getValue().ranges.intersects(slice)) continue;
- copy.put(e.getKey(), e.getValue().slice(slice));
+ switch (searchKeysOrRanges.domain())
+ {
+ case Range:
+ for (Unseekable range : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(TokenRange) range, minTxnId, maxTxnId, forEach);
+ break;
+ case Key:
+ for (Unseekable key : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(AccordRoutingKey) key, minTxnId, maxTxnId, forEach);
+ }
+
+ if (!manager.transitive.isEmpty())
+ {
+ for (var e : manager.transitive.tailMap(minTxnId,
true).entrySet())
+ {
+ if (e.getValue().intersects(searchKeysOrRanges))
+ forEach.accept(e.getKey());
+ }
+ }
+ }
+
+ public void forEachInCache(Consumer<Summary> forEach,
AccordCommandStore.Caches caches)
+ {
+ for (TxnId txnId : manager.cachedRangeTxns)
+ {
+ AccordCacheEntry<TxnId, Command> state =
caches.commands().getUnsafe(txnId);
+ Summary summary = from(state);
+ if (summary != null)
+ forEach.accept(summary);
+ }
+ }
+
+ public Summary load(TxnId txnId)
+ {
+ if (findAsDep == null)
+ {
+ SavedCommand.MinimalCommand cmd =
manager.commandStore.loadMinimal(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+ else
+ {
+ Command cmd = manager.commandStore.loadCommand(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+
+ Ranges ranges = manager.transitive.get(txnId);
+ if (ranges == null)
+ return null;
+
+ ranges = ranges.intersecting(searchKeysOrRanges);
+ if (ranges.isEmpty())
+ return null;
+
+ return new Summary(txnId, txnId, SaveStatus.NotDefined, ranges,
null, false);
Review Comment:
good catch
##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -160,33 +158,279 @@ private <P1, T> T mapReduce(@Nonnull Timestamp
testTimestamp, @Nullable TxnId te
Invariants.checkState(testTxnId.equals(summary.findAsDep));
}
- // TODO (required): ensure we are excluding any ranges that are
now shard-redundant (not sure if this is enforced yet)
for (Range range : summary.ranges)
{
- if (!this.ranges.intersects(range))
- continue;
- collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
+ if (keysOrRanges.intersects(range))
+ collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
}
}));
- for (Map.Entry<Range, List<CommandsForRangesLoader.Summary>> e :
collect.entrySet())
+ for (Map.Entry<Range, List<Summary>> e : collect.entrySet())
{
- for (CommandsForRangesLoader.Summary command : e.getValue())
+ for (Summary command : e.getValue())
accumulate = map.apply(p1, e.getKey(), command.txnId,
command.executeAt, accumulate);
}
return accumulate;
}
- public CommandsForRanges slice(Ranges slice)
+ public static class Summary
+ {
+ public final @Nonnull TxnId txnId;
+ public final @Nonnull Timestamp executeAt;
+ public final @Nonnull SaveStatus saveStatus;
+ public final @Nonnull Ranges ranges;
+
+ public final TxnId findAsDep;
+ public final boolean hasAsDep;
+
+ @VisibleForTesting
+ Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull
SaveStatus saveStatus, @Nonnull Ranges ranges, TxnId findAsDep, boolean
hasAsDep)
+ {
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.saveStatus = saveStatus;
+ this.ranges = ranges;
+ this.findAsDep = findAsDep;
+ this.hasAsDep = hasAsDep;
+ }
+
+ public Summary slice(Ranges slice)
+ {
+ return new Summary(txnId, executeAt, saveStatus,
ranges.slice(slice, Minimal), findAsDep, hasAsDep);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Summary{" +
+ "txnId=" + txnId +
+ ", executeAt=" + executeAt +
+ ", saveStatus=" + saveStatus +
+ ", ranges=" + ranges +
+ ", findAsDep=" + findAsDep +
+ ", hasAsDep=" + hasAsDep +
+ '}';
+ }
+ }
+
+ public static class Manager implements AccordCache.Listener<TxnId, Command>
+ {
+ private final AccordCommandStore commandStore;
+ private final RoutesSearcher searcher = new RoutesSearcher();
+ private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+ private final ObjectHashSet<TxnId> cachedRangeTxns = new
ObjectHashSet<>();
+
+ public Manager(AccordCommandStore commandStore)
+ {
+ this.commandStore = commandStore;
+ try (AccordCommandStore.ExclusiveCaches caches =
commandStore.lockCaches())
+ {
+ caches.commands().register(this);
+ }
+ }
+
+ @Override
+ public void onAdd(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.add(txnId);
+ }
+
+ @Override
+ public void onEvict(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.remove(txnId);
+ }
+
+ public CommandsForRanges.Loader loader(@Nullable TxnId primaryTxnId,
KeyHistory keyHistory, Unseekables<?> keysOrRanges)
+ {
+ RedundantBefore redundantBefore =
commandStore.unsafeGetRedundantBefore();
+ TxnId minTxnId = redundantBefore.min(keysOrRanges, e ->
e.gcBefore);
+ Timestamp maxTxnId = primaryTxnId == null || keyHistory ==
KeyHistory.RECOVER || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX :
primaryTxnId;
+ TxnId findAsDep = primaryTxnId != null && keyHistory ==
KeyHistory.RECOVER ? primaryTxnId : null;
+ return new CommandsForRanges.Loader(this, keysOrRanges,
redundantBefore, minTxnId, maxTxnId, findAsDep);
+ }
+
+ public void mergeTransitive(TxnId txnId, Ranges ranges, BiFunction<?
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+ {
+ transitive.merge(txnId, ranges, remappingFunction);
+ }
+
+ public void gcBefore(TxnId gcBefore, Ranges ranges)
+ {
+ Iterator<Map.Entry<TxnId, Ranges>> iterator =
transitive.headMap(gcBefore).entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<TxnId, Ranges> e = iterator.next();
+ Ranges newRanges = e.getValue().without(ranges);
+ if (newRanges.isEmpty())
+ iterator.remove();
+ e.setValue(newRanges);
+ }
+ }
+ }
+
+ public static class Loader
{
- Ranges ranges = this.ranges.slice(slice, Minimal);
- NavigableMap<Timestamp, CommandsForRangesLoader.Summary> copy = new
TreeMap<>();
- for (Map.Entry<Timestamp, CommandsForRangesLoader.Summary> e :
map.entrySet())
+ private final Manager manager;
+ final Unseekables<?> searchKeysOrRanges;
+ final RedundantBefore redundantBefore;
+ final TxnId minTxnId;
+ final Timestamp maxTxnId;
+ @Nullable final TxnId findAsDep;
+
+ public Loader(Manager manager, Unseekables<?> searchKeysOrRanges,
RedundantBefore redundantBefore, TxnId minTxnId, Timestamp maxTxnId, @Nullable
TxnId findAsDep)
+ {
+ this.manager = manager;
+ this.searchKeysOrRanges = searchKeysOrRanges;
+ this.redundantBefore = redundantBefore;
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.findAsDep = findAsDep;
+ }
+
+ public void intersects(Consumer<TxnId> forEach)
{
- if (!e.getValue().ranges.intersects(slice)) continue;
- copy.put(e.getKey(), e.getValue().slice(slice));
+ switch (searchKeysOrRanges.domain())
+ {
+ case Range:
+ for (Unseekable range : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(TokenRange) range, minTxnId, maxTxnId, forEach);
+ break;
+ case Key:
+ for (Unseekable key : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(AccordRoutingKey) key, minTxnId, maxTxnId, forEach);
+ }
+
+ if (!manager.transitive.isEmpty())
+ {
+ for (var e : manager.transitive.tailMap(minTxnId,
true).entrySet())
+ {
+ if (e.getValue().intersects(searchKeysOrRanges))
+ forEach.accept(e.getKey());
+ }
+ }
+ }
+
+ public void forEachInCache(Consumer<Summary> forEach,
AccordCommandStore.Caches caches)
+ {
+ for (TxnId txnId : manager.cachedRangeTxns)
+ {
+ AccordCacheEntry<TxnId, Command> state =
caches.commands().getUnsafe(txnId);
+ Summary summary = from(state);
+ if (summary != null)
+ forEach.accept(summary);
+ }
+ }
+
+ public Summary load(TxnId txnId)
+ {
+ if (findAsDep == null)
+ {
+ SavedCommand.MinimalCommand cmd =
manager.commandStore.loadMinimal(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+ else
+ {
+ Command cmd = manager.commandStore.loadCommand(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+
+ Ranges ranges = manager.transitive.get(txnId);
+ if (ranges == null)
+ return null;
+
+ ranges = ranges.intersecting(searchKeysOrRanges);
+ if (ranges.isEmpty())
+ return null;
+
+ return new Summary(txnId, txnId, SaveStatus.NotDefined, ranges,
null, false);
+ }
+
+ public Summary from(AccordCacheEntry<TxnId, Command> state)
+ {
+ if (state.key().domain() != Routable.Domain.Range)
+ return null;
+
+ switch (state.status())
+ {
+ default: throw new AssertionError("Unhandled status: " +
state.status());
+ case LOADING:
+ case WAITING_TO_LOAD:
+ case UNINITIALIZED:
+ return null;
+
+ case LOADED:
+ case MODIFIED:
+ case SAVING:
+ case FAILED_TO_SAVE:
+ }
+
+ TxnId txnId = state.key();
+ if (!txnId.isVisible() || txnId.compareTo(minTxnId) < 0 ||
txnId.compareTo(maxTxnId) >= 0)
+ return null;
+
+ Command command = state.getExclusive();
+ if (command == null)
+ return null;
+ return from(command);
+ }
+
+ public Summary from(Command cmd)
+ {
+ return from(cmd.txnId(), cmd.executeAt(), cmd.saveStatus(),
cmd.participants(), cmd.partialDeps());
+ }
+
+ public Summary from(SavedCommand.MinimalCommand cmd)
+ {
+ Invariants.checkState(findAsDep == null);
+ return from(cmd.txnId, cmd.executeAt, cmd.saveStatus,
cmd.participants, null);
+ }
+
+ private Summary from(TxnId txnId, Timestamp executeAt, SaveStatus
saveStatus, StoreParticipants participants, @Nullable PartialDeps partialDeps)
+ {
+ if (participants == null)
+ return null;
+
+ Ranges keysOrRanges = participants.touches().toRanges();
+ if (keysOrRanges.domain() != Routable.Domain.Range)
Review Comment:
is this needed? the type is `Ranges`
##########
src/java/org/apache/cassandra/index/accord/RoutesSearcher.java:
##########
@@ -97,22 +98,80 @@ public Entry next()
}
}
- public void intersects(int store, TokenRange range, TxnId minTxnId,
Timestamp maxTxnId, Consumer<TxnId> forEach)
+ private CloseableIterator<Entry> searchKey(int store, AccordRoutingKey key)
{
- intersects(store, range.start(), range.end(), minTxnId, maxTxnId,
forEach);
+ RowFilter rowFilter = RowFilter.create(false);
+ rowFilter.add(participants, Operator.GTE,
OrderedRouteSerializer.serializeRoutingKey(key));
+ rowFilter.add(participants, Operator.LTE,
OrderedRouteSerializer.serializeRoutingKey(key));
+ rowFilter.add(store_id, Operator.EQ,
Int32Type.instance.decompose(store));
+
+ var cmd = PartitionRangeReadCommand.create(cfs.metadata(),
+ FBUtilities.nowInSeconds(),
+ columnFilter,
+ rowFilter,
+ limits,
+ dataRange);
+ Index.Searcher s = index.searcherFor(cmd);
+ try (var controller = cmd.executionController())
+ {
+ UnfilteredPartitionIterator partitionIterator =
s.search(controller);
+ return new CloseableIterator<>()
+ {
+ private final Entry entry = new Entry();
+ @Override
+ public void close()
+ {
+ partitionIterator.close();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return partitionIterator.hasNext();
+ }
+
+ @Override
+ public Entry next()
+ {
+ UnfilteredRowIterator next = partitionIterator.next();
+ var partitionKeyComponents =
AccordKeyspace.CommandRows.splitPartitionKey(next.partitionKey());
Review Comment:
```suggestion
ByteBuffer[] partitionKeyComponents =
AccordKeyspace.CommandRows.splitPartitionKey(next.partitionKey());
```
since we want to ban `var`
##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -48,56 +64,41 @@
import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
-public class CommandsForRanges implements CommandsSummary
+public class CommandsForRanges extends TreeMap<Timestamp,
CommandsForRanges.Summary> implements CommandsSummary
{
- public final Ranges ranges;
- private final NavigableMap<Timestamp, CommandsForRangesLoader.Summary> map;
-
- private CommandsForRanges(Ranges ranges, NavigableMap<Timestamp,
CommandsForRangesLoader.Summary> map)
- {
- this.ranges = ranges;
- this.map = map;
- }
-
- public static CommandsForRanges create(Ranges ranges,
NavigableMap<Timestamp, CommandsForRangesLoader.Summary> map)
- {
- return new CommandsForRanges(ranges, map);
- }
-
- @VisibleForTesting
- public int size()
+ public CommandsForRanges(Map<? extends Timestamp, ? extends Summary> m)
{
- return map.size();
+ super(m);
}
@Override
- public <P1, T> T mapReduceFull(TxnId testTxnId, Txn.Kind.Kinds testKind,
TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus,
CommandFunction<P1, T, T> map, P1 p1, T accumulate)
+ public <P1, T> T mapReduceFull(Routables<?> keysOrRanges, TxnId testTxnId,
Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, TestDep testDep,
TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate)
Review Comment:
ill try to get you an example of what I am seeing. the topology mixup tests
have been hitting cases where we load different ranges than what we are
mapReducing, so we can't actually say the ranges don't exist as we never tried
to load them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]