ifesdjeen commented on code in PR #266: URL: https://github.com/apache/cassandra-accord/pull/266#discussion_r2690085017
########## accord-core/src/main/java/accord/impl/cfr/IdEntry.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.cfr; + +import accord.primitives.Ranges; +import accord.primitives.SaveStatus; +import accord.primitives.Status; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; + +public abstract class IdEntry extends TxnId +{ + static final int SAVE_SATUS_SHIFT = Status.Durability.ENCODING_BITS; Review Comment: nit: "satus" ########## accord-core/src/main/java/accord/impl/cfr/IdSingleEntry.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.cfr; + +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.TxnId; + +public class IdSingleEntry extends IdEntry implements RangeEntry +{ + final Range range; + + public IdSingleEntry(TxnId txnId, Range range) + { + super(txnId); + this.range = range; + } + + @Override + public Range range() + { + return range; + } + + @Override + public IdEntry id() + { + return this; + } + + @Override + IdEntry copy() + { + accord.impl.cfr.IdSingleEntry copy = new accord.impl.cfr.IdSingleEntry(this, range); Review Comment: nit: do not need to fully qualify here ########## accord-core/src/main/java/accord/impl/cfr/InMemoryRangeSummaryIndex.java: ########## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.cfr; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +import javax.annotation.Nullable; + +import com.google.common.collect.ImmutableList; + +import accord.api.RoutingKey; +import accord.local.Command; +import accord.local.CommandStore; +import accord.local.CommandSummaries.Summary; +import accord.local.CommandSummaries.SummaryLoader; +import accord.local.CommandSummaries.Relevance; +import accord.local.RedundantBefore; +import accord.primitives.AbstractRanges; +import accord.primitives.AbstractUnseekableKeys; +import accord.primitives.Participants; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.SaveStatus; +import accord.primitives.TxnId; +import accord.primitives.Unseekable; +import accord.primitives.Unseekables; +import accord.utils.Invariants; +import accord.utils.SemiSyncIntervalTree; +import accord.utils.UnhandledEnum; +import accord.utils.async.Cancellable; +import accord.utils.btree.BTree; +import accord.utils.btree.IntervalBTree; +import accord.utils.btree.IntervalBTree.FastIntervalTreeBuilder; + +import static accord.api.ProtocolModifiers.RangeSpec.isEndInclusive; +import static accord.impl.cfr.ListenerEntry.LISTENER_ENTRIES; +import static accord.impl.cfr.ListenerEntry.LISTENER_WITH_KEYS; +import static accord.impl.cfr.ListenerEntry.LISTENER_WITH_RANGES; +import static accord.impl.cfr.RangeEntry.ENTRIES; +import static accord.impl.cfr.RangeEntry.WITH_KEY; +import static accord.impl.cfr.RangeEntry.WITH_RANGE; +import static accord.local.RedundantStatus.Property.GC_BEFORE; +import static accord.local.RedundantStatus.Property.LOCALLY_APPLIED; +import static accord.local.RedundantStatus.Property.SHARD_APPLIED; +import static accord.local.RedundantStatus.Property.UNREADY; + +/** + * An implementation filling the same niche as CommandsForKey, only we do not retain + * complete information, and may need to load a command to decide if it is relevant. + * We also don't consume this structure directly, instead building a set of summary records to work with. + */ +public class InMemoryRangeSummaryIndex extends SemiSyncIntervalTree<IdEntry> +{ + private final Map<TxnId, IdEntry> byId = new HashMap<>(); + private Object[] listeners = IntervalBTree.empty(); + + public InMemoryRangeSummaryIndex() + { + super(ENTRIES); + Invariants.require(isEndInclusive(), "Need to implement range-exclusive IntervalComparators"); + } + + private boolean update(Command command) + { + TxnId txnId = command.txnId(); + SaveStatus saveStatus = command.saveStatus(); + AbstractRanges participants = (AbstractRanges) command.participants().stillTouches(); + if (saveStatus.compareTo(SaveStatus.TruncatedUnapplied) >= 0 || participants.isEmpty()) + { + IdEntry entry = byId.remove(txnId); + if (entry != null) + pushEdit(entry, null, entry); + return false; + } + + IdEntry cur = byId.get(txnId); + IdEntry next = cur; + if (participants.size() == 1) + { + Range range = participants.get(0).asRange(); + if (cur == null || cur.getClass() != IdSingleEntry.class || !((IdSingleEntry) cur).range.equals(range)) + next = new IdSingleEntry(txnId, range); + } + else + { + if (cur == null || cur.getClass() != IdMultiEntry.class || !((IdMultiEntry) cur).ranges.hasSameRanges(participants)) + next = new IdMultiEntry(txnId, participants.toRanges()); + } + if (next != cur) + { + if (cur != null) + byId.remove(cur, cur); + byId.put(next, next); + pushEdit(next, next, cur); + } + return next.update(saveStatus, command.durability(), command.executeAtIfKnown()) || next != cur; + } + + private static Object[] toMultiTree(IdMultiEntry entry) + { + try (FastIntervalTreeBuilder<RangeEntry> builder = IntervalBTree.fastBuilder(ENTRIES)) + { + for (Range range : entry.ranges) + builder.add(new RangeMultiEntry(range, entry)); + return builder.build(); + } + } + + public <A> A foldl(Unseekables<?> participants, BiFunction<IdEntry, A, A> f, A accumulator) + { + Object[] tree = get(); + switch (participants.domain()) + { + case Key: + for (RoutingKey key : (AbstractUnseekableKeys)participants) + accumulator = IntervalBTree.accumulate(tree, WITH_KEY, key, InMemoryRangeSummaryIndex::foldl, f, participants, accumulator); + break; + case Range: + for (Range range : (AbstractRanges)participants) + accumulator = IntervalBTree.accumulate(tree, WITH_RANGE, range, InMemoryRangeSummaryIndex::foldl, f, participants, accumulator); + break; + } + return accumulator; + } + + public void populateMinFutureRx(SummaryLoader loader) + { + foldl(loader.participants(), (e, l) -> { + if (e.isSyncPoint() && l.shouldRecordFutureRx(e, e.saveStatus().summary)) + l.recordFutureRx(e.plainTxnId(), e.ranges()); + return l; + }, loader); + } + + /** + * If we have enough information, simply directly build the Summary object and return a map of these objects; + * otherwise invoke the provided Consumer so that the implementation may build the summary + */ + public void search(SummaryLoader loader, @Nullable BiConsumer<TxnId, Summary> found, @Nullable Consumer<TxnId> mustLoad) + { + foldl(loader.participants(), (e, l) -> { + Ranges ranges = e.ranges(); + Relevance relevance = l.relevance(e, e.saveStatus(), e.durability(), e.maybeExecuteAt(), ranges); + if (relevance == Relevance.IRRELEVANT) + return l; + + TxnId txnId = e.plainTxnId(); + switch (relevance) + { + default: throw new UnhandledEnum(relevance); + case ACTIVE: + if (found != null) + { + Summary summary = l.ifRelevant(e.plainTxnId(), e.maybeExecuteAt(), e.saveStatus(), e.durability(), ranges, null); + Invariants.nonNull(summary); + // TODO (expected): we can post-filter collected txnId after recording future rx to remove those that are no longer needed + // (or, we can retain summary information to permit evicting them as we collect) + found.accept(txnId, summary); + } + break; + case SUPERSEDING: + case BOTH: + if (mustLoad != null) + mustLoad.accept(txnId); + } + return l; + }, loader); + } + + static < A> A foldl(BiFunction<IdEntry, A, A> f, Unseekables<?> find, RangeEntry e, A accumulator) + { + if (e.getClass() == IdSingleEntry.class) + { + return f.apply((IdSingleEntry)e, accumulator); + } + else + { + IdEntry id = e.id(); + IdMultiEntry mid = (IdMultiEntry) id; + if (mid.ranges.size() > 1) + { + int i = (int) (find.findFirstIntersection(mid.ranges) >>> 32); + if (mid.ranges.get(i) != e.range()) + return accumulator; + } + return f.apply(id, accumulator); + } + } + + public void update(Command prev, Command updated, boolean force) + { + if (!force + && updated.saveStatus() == prev.saveStatus() + && (updated.saveStatus().known.executeAt().isDecided() && !prev.saveStatus().known.executeAt().isDecided()) Review Comment: how can isDecided be different if save statuses are identity-equal? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

