dcapwell commented on code in PR #2144: URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103126636
########## src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Comparator; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import accord.api.Agent; +import accord.api.DataStore; +import accord.api.Key; +import accord.api.ProgressLog; +import accord.impl.CommandsForKey; +import accord.impl.CommandsForKeys; +import accord.local.Command; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.NodeTimeService; +import accord.local.PreExecuteContext; +import accord.local.SafeCommandStores; +import accord.local.Status; +import accord.primitives.AbstractKeys; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; + +public class AccordSafeCommandStore extends SafeCommandStores.AbstractSafeCommandStore +{ + private final AccordCommandStore commandStore; + + public AccordSafeCommandStore(PreExecuteContext context, AccordCommandStore commandStore) + { + super(context); + this.commandStore = commandStore; + } + + @Override + protected Command getIfLoaded(TxnId txnId) + { + return commandStore.commandCache().referenceAndGetIfLoaded(txnId); + } + + @Override + protected CommandsForKey getIfLoaded(RoutableKey key) + { + return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key); + } + + @Override + public AccordCommandStore commandStore() + { + return commandStore; + } + + @Override + public DataStore dataStore() + { + return commandStore().dataStore(); + } + + @Override + public Agent agent() + { + return commandStore.agent(); + } + + @Override + public ProgressLog progressLog() + { + return commandStore().progressLog(); + } + + @Override + public NodeTimeService time() + { + return commandStore.time(); + } + + @Override + public RangesForEpoch ranges() + { + return commandStore().ranges(); + } + + @Override + public long latestEpoch() + { + return commandStore().time().epoch(); + } + + @Override + protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice) Review Comment: I question if this should be in C*... I feel this really should be in accord and we shouldn't have to define it unless C* has a different logic (which we don't as far as I see) ########## src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Comparator; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import accord.api.Agent; +import accord.api.DataStore; +import accord.api.Key; +import accord.api.ProgressLog; +import accord.impl.CommandsForKey; +import accord.impl.CommandsForKeys; +import accord.local.Command; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.NodeTimeService; +import accord.local.PreExecuteContext; +import accord.local.SafeCommandStores; +import accord.local.Status; +import accord.primitives.AbstractKeys; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; + +public class AccordSafeCommandStore extends SafeCommandStores.AbstractSafeCommandStore +{ + private final AccordCommandStore commandStore; + + public AccordSafeCommandStore(PreExecuteContext context, AccordCommandStore commandStore) + { + super(context); + this.commandStore = commandStore; + } + + @Override + protected Command getIfLoaded(TxnId txnId) + { + return commandStore.commandCache().referenceAndGetIfLoaded(txnId); + } + + @Override + protected CommandsForKey getIfLoaded(RoutableKey key) + { + return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key); + } + + @Override + public AccordCommandStore commandStore() + { + return commandStore; + } + + @Override + public DataStore dataStore() + { + return commandStore().dataStore(); + } + + @Override + public Agent agent() + { + return commandStore.agent(); + } + + @Override + public ProgressLog progressLog() + { + return commandStore().progressLog(); + } + + @Override + public NodeTimeService time() + { + return commandStore.time(); + } + + @Override + public RangesForEpoch ranges() + { + return commandStore().ranges(); + } + + @Override + public long latestEpoch() + { + return commandStore().time().epoch(); + } + + @Override + protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice) + { + // TODO: Seekables + // TODO: efficiency + return ((Keys)keysOrRanges).stream() + .map(this::maybeCommandsForKey) + .filter(Objects::nonNull) + .map(CommandsForKey::max) + .max(Comparator.naturalOrder()) + .orElse(Timestamp.NONE); + } + + public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) + { + switch (keysOrRanges.domain()) { + default: + throw new AssertionError(); + case Key: + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + return keys.stream() + .map(this::commandsForKey) + .map(map) + .reduce(initialValue, reduce); + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue) Review Comment: personal preference: functions should be at the end of a method for readability, we are not consistent and know the JDK doesn't always do this, but I personally find the code cleaner to read. The only caller splits the inputs before/after the lambda which I find is harder to read than ``` return mapReduceForKey(keysOrRanges, slice, accumulate, terminalValue, (forKey, prev) -> { ... }); ``` ########## src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java: ########## @@ -57,131 +59,94 @@ private State state = State.INITIALIZED; private final AccordCommandStore commandStore; - private final Iterable<TxnId> txnIds; - private final Iterable<PartitionKey> keys; + private final List<TxnId> txnIds; + private final List<RoutableKey> keys; - protected Future<?> readFuture; + protected AsyncResult<?> readResult; - public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys) + public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<RoutableKey> keys) { this.commandStore = commandStore; - this.txnIds = txnIds; - this.keys = keys; + this.txnIds = Lists.newArrayList(txnIds); + this.keys = Lists.newArrayList(keys); } - private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key, - AccordStateCache.Instance<K, V> cache, - Map<K, V> context, - Function<V, Future<?>> readFunction, - Object callback) - { - V item; - Future<?> future = cache.getLoadFuture(key); - if (future != null) - { - // if a load future exists for this, it must be present in the cache - item = cache.getOrNull(key); - Preconditions.checkState(item != null); - context.put(key, item); - if (logger.isTraceEnabled()) - logger.trace("Existing load future found for {} while loading for {}. ({})", item.key(), callback, item); - return future; - } - - item = cache.getOrCreate(key); - context.put(key, item); - if (item.isLoaded()) - { - if (logger.isTraceEnabled()) - logger.trace("Cached item found for {} while loading for {}. ({})", item.key(), callback, item); - return null; - } - - future = readFunction.apply(item); - cache.setLoadFuture(item.key(), future); - if (logger.isTraceEnabled()) - logger.trace("Loading new item for {} while loading for {}. ({})", item.key(), callback, item); - return future; - } - - - private <K, V extends AccordState<K>> List<Future<?>> referenceAndDispatchReads(Iterable<K> keys, + private <K, V extends ImmutableState> List<AsyncChain<Void>> referenceAndDispatchReads(Iterable<K> keys, AccordStateCache.Instance<K, V> cache, - Map<K, V> context, - Function<V, Future<?>> readFunction, - List<Future<?>> futures, + LoadFunction<K, V> loadFunction, + List<AsyncChain<Void>> results, Object callback) { for (K key : keys) { - Future<?> future = referenceAndDispatch(key, cache, context, readFunction, callback); - if (future == null) + AsyncResult<Void> result = cache.referenceAndLoad(key, loadFunction); + if (result == null) continue; - if (futures == null) - futures = new ArrayList<>(); + if (results == null) + results = new ArrayList<>(); - futures.add(future); + results.add(result); } - return futures; + return results; } @VisibleForTesting - Function<AccordCommand, Future<?>> loadCommandFunction(Object callback) Review Comment: I question if we actually want to have this `callback` and log it... at the moment there is a single caller ``` loader.load(this::callback) ``` so our logs would just say a lambda exists in `org.apache.cassandra.service.accord.async.AsyncOperation`, which I don't feel is that helpful. It also adds confusion to readers as we are making a async chain that takes a callback, so why are we ignoring this callback? Removing the callback also means we don't allocate a new lambda for every request, so can save the allocation and keep reusing. ########## src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java: ########## @@ -57,131 +59,94 @@ private State state = State.INITIALIZED; private final AccordCommandStore commandStore; - private final Iterable<TxnId> txnIds; - private final Iterable<PartitionKey> keys; + private final List<TxnId> txnIds; + private final List<RoutableKey> keys; - protected Future<?> readFuture; + protected AsyncResult<?> readResult; - public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys) + public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<RoutableKey> keys) { this.commandStore = commandStore; - this.txnIds = txnIds; - this.keys = keys; + this.txnIds = Lists.newArrayList(txnIds); + this.keys = Lists.newArrayList(keys); } - private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key, - AccordStateCache.Instance<K, V> cache, - Map<K, V> context, - Function<V, Future<?>> readFunction, - Object callback) - { - V item; - Future<?> future = cache.getLoadFuture(key); - if (future != null) - { - // if a load future exists for this, it must be present in the cache - item = cache.getOrNull(key); - Preconditions.checkState(item != null); - context.put(key, item); - if (logger.isTraceEnabled()) - logger.trace("Existing load future found for {} while loading for {}. ({})", item.key(), callback, item); - return future; - } - - item = cache.getOrCreate(key); - context.put(key, item); - if (item.isLoaded()) - { - if (logger.isTraceEnabled()) - logger.trace("Cached item found for {} while loading for {}. ({})", item.key(), callback, item); - return null; - } - - future = readFunction.apply(item); - cache.setLoadFuture(item.key(), future); - if (logger.isTraceEnabled()) - logger.trace("Loading new item for {} while loading for {}. ({})", item.key(), callback, item); - return future; - } - - - private <K, V extends AccordState<K>> List<Future<?>> referenceAndDispatchReads(Iterable<K> keys, + private <K, V extends ImmutableState> List<AsyncChain<Void>> referenceAndDispatchReads(Iterable<K> keys, AccordStateCache.Instance<K, V> cache, - Map<K, V> context, - Function<V, Future<?>> readFunction, - List<Future<?>> futures, + LoadFunction<K, V> loadFunction, + List<AsyncChain<Void>> results, Object callback) { for (K key : keys) { - Future<?> future = referenceAndDispatch(key, cache, context, readFunction, callback); - if (future == null) + AsyncResult<Void> result = cache.referenceAndLoad(key, loadFunction); + if (result == null) continue; - if (futures == null) - futures = new ArrayList<>(); + if (results == null) + results = new ArrayList<>(); - futures.add(future); + results.add(result); } - return futures; + return results; } @VisibleForTesting - Function<AccordCommand, Future<?>> loadCommandFunction(Object callback) + LoadFunction<TxnId, Command> loadCommandFunction(Object callback) { - return command -> Stage.READ.submit(() -> { + return (txnId, consumer) -> ofRunnable(Stage.READ.executor(), () -> { try { - logger.trace("Starting load of {} for {}", command.txnId(), callback); - AccordKeyspace.loadCommand(commandStore, command); - logger.trace("Completed load of {} for {}", command.txnId(), callback); + logger.trace("Starting load of {} for {}", txnId, callback); + Command command = AccordKeyspace.loadCommand(commandStore, txnId); + logger.trace("Completed load of {} for {}", txnId, callback); + consumer.accept(command); } catch (Throwable t) { - logger.error("Exception loading {} for {}", command.txnId(), callback, t); + logger.error("Exception loading {} for {}", txnId, callback, t); throw t; } }); } @VisibleForTesting - Function<AccordCommandsForKey, Future<?>> loadCommandsPerKeyFunction(Object callback) + LoadFunction<RoutableKey, CommandsForKey> loadCommandsPerKeyFunction(Object callback) Review Comment: same as above comment, should we remove `callback`? we currently only use it for logging and all it does is say there exists a lambda from AsyncOperation, so its not something we really can use for debugging. Removing also means we can avoid creating a new lambda for each request and reuse the same lambda ########## src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Comparator; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import accord.api.Agent; +import accord.api.DataStore; +import accord.api.Key; +import accord.api.ProgressLog; +import accord.impl.CommandsForKey; +import accord.impl.CommandsForKeys; +import accord.local.Command; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.NodeTimeService; +import accord.local.PreExecuteContext; +import accord.local.SafeCommandStores; +import accord.local.Status; +import accord.primitives.AbstractKeys; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; + +public class AccordSafeCommandStore extends SafeCommandStores.AbstractSafeCommandStore +{ + private final AccordCommandStore commandStore; + + public AccordSafeCommandStore(PreExecuteContext context, AccordCommandStore commandStore) + { + super(context); + this.commandStore = commandStore; + } + + @Override + protected Command getIfLoaded(TxnId txnId) + { + return commandStore.commandCache().referenceAndGetIfLoaded(txnId); + } + + @Override + protected CommandsForKey getIfLoaded(RoutableKey key) + { + return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key); + } + + @Override + public AccordCommandStore commandStore() + { + return commandStore; + } + + @Override + public DataStore dataStore() + { + return commandStore().dataStore(); + } + + @Override + public Agent agent() + { + return commandStore.agent(); + } + + @Override + public ProgressLog progressLog() + { + return commandStore().progressLog(); + } + + @Override + public NodeTimeService time() + { + return commandStore.time(); + } + + @Override + public RangesForEpoch ranges() + { + return commandStore().ranges(); + } + + @Override + public long latestEpoch() + { + return commandStore().time().epoch(); + } + + @Override + protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice) + { + // TODO: Seekables + // TODO: efficiency + return ((Keys)keysOrRanges).stream() + .map(this::maybeCommandsForKey) + .filter(Objects::nonNull) + .map(CommandsForKey::max) + .max(Comparator.naturalOrder()) + .orElse(Timestamp.NONE); + } + + public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) Review Comment: feel this should be in accord and not C*, this should be generic ########## src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Comparator; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import accord.api.Agent; +import accord.api.DataStore; +import accord.api.Key; +import accord.api.ProgressLog; +import accord.impl.CommandsForKey; +import accord.impl.CommandsForKeys; +import accord.local.Command; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.NodeTimeService; +import accord.local.PreExecuteContext; +import accord.local.SafeCommandStores; +import accord.local.Status; +import accord.primitives.AbstractKeys; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; + +public class AccordSafeCommandStore extends SafeCommandStores.AbstractSafeCommandStore +{ + private final AccordCommandStore commandStore; + + public AccordSafeCommandStore(PreExecuteContext context, AccordCommandStore commandStore) + { + super(context); + this.commandStore = commandStore; + } + + @Override + protected Command getIfLoaded(TxnId txnId) + { + return commandStore.commandCache().referenceAndGetIfLoaded(txnId); + } + + @Override + protected CommandsForKey getIfLoaded(RoutableKey key) + { + return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key); + } + + @Override + public AccordCommandStore commandStore() + { + return commandStore; + } + + @Override + public DataStore dataStore() + { + return commandStore().dataStore(); + } + + @Override + public Agent agent() + { + return commandStore.agent(); + } + + @Override + public ProgressLog progressLog() + { + return commandStore().progressLog(); + } + + @Override + public NodeTimeService time() + { + return commandStore.time(); + } + + @Override + public RangesForEpoch ranges() + { + return commandStore().ranges(); + } + + @Override + public long latestEpoch() + { + return commandStore().time().epoch(); + } + + @Override + protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice) + { + // TODO: Seekables + // TODO: efficiency + return ((Keys)keysOrRanges).stream() + .map(this::maybeCommandsForKey) + .filter(Objects::nonNull) + .map(CommandsForKey::max) + .max(Comparator.naturalOrder()) + .orElse(Timestamp.NONE); + } + + public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) + { + switch (keysOrRanges.domain()) { + default: + throw new AssertionError(); + case Key: + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + return keys.stream() + .map(this::commandsForKey) + .map(map) + .reduce(initialValue, reduce); + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue) + { + switch (keysOrRanges.domain()) { + default: + throw new AssertionError(); + case Key: + // TODO: efficiency + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + for (Key key : keys) + { + if (!slice.contains(key)) continue; + CommandsForKey forKey = commandsForKey(key); + accumulate = map.apply(forKey, accumulate); + if (accumulate.equals(terminalValue)) + return accumulate; + } + break; + case Range: + // TODO (required): implement + throw new UnsupportedOperationException(); + } + return accumulate; + } + + @Override + public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue) Review Comment: similar feedback to the other methods, do we need this to live in C*? I don't see anything specific to us, so feel this should be pushed to accord. ########## src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Comparator; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import accord.api.Agent; +import accord.api.DataStore; +import accord.api.Key; +import accord.api.ProgressLog; +import accord.impl.CommandsForKey; +import accord.impl.CommandsForKeys; +import accord.local.Command; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.NodeTimeService; +import accord.local.PreExecuteContext; +import accord.local.SafeCommandStores; +import accord.local.Status; +import accord.primitives.AbstractKeys; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; + +public class AccordSafeCommandStore extends SafeCommandStores.AbstractSafeCommandStore +{ + private final AccordCommandStore commandStore; + + public AccordSafeCommandStore(PreExecuteContext context, AccordCommandStore commandStore) + { + super(context); + this.commandStore = commandStore; + } + + @Override + protected Command getIfLoaded(TxnId txnId) + { + return commandStore.commandCache().referenceAndGetIfLoaded(txnId); + } + + @Override + protected CommandsForKey getIfLoaded(RoutableKey key) + { + return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key); + } + + @Override + public AccordCommandStore commandStore() + { + return commandStore; + } + + @Override + public DataStore dataStore() + { + return commandStore().dataStore(); + } + + @Override + public Agent agent() + { + return commandStore.agent(); + } + + @Override + public ProgressLog progressLog() + { + return commandStore().progressLog(); + } + + @Override + public NodeTimeService time() + { + return commandStore.time(); + } + + @Override + public RangesForEpoch ranges() + { + return commandStore().ranges(); + } + + @Override + public long latestEpoch() + { + return commandStore().time().epoch(); + } + + @Override + protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice) + { + // TODO: Seekables + // TODO: efficiency + return ((Keys)keysOrRanges).stream() + .map(this::maybeCommandsForKey) + .filter(Objects::nonNull) + .map(CommandsForKey::max) + .max(Comparator.naturalOrder()) + .orElse(Timestamp.NONE); + } + + public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) + { + switch (keysOrRanges.domain()) { + default: + throw new AssertionError(); + case Key: + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + return keys.stream() + .map(this::commandsForKey) + .map(map) + .reduce(initialValue, reduce); + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue) + { + switch (keysOrRanges.domain()) { + default: + throw new AssertionError(); + case Key: + // TODO: efficiency + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + for (Key key : keys) + { + if (!slice.contains(key)) continue; + CommandsForKey forKey = commandsForKey(key); + accumulate = map.apply(forKey, accumulate); + if (accumulate.equals(terminalValue)) + return accumulate; + } + break; + case Range: + // TODO (required): implement + throw new UnsupportedOperationException(); + } + return accumulate; + } + + @Override + public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue) + { + accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> { + CommandsForKey.CommandTimeseries<?> timeseries; + switch (testTimestamp) + { + default: throw new AssertionError(); + case STARTED_AFTER: + case STARTED_BEFORE: + timeseries = forKey.byId(); + break; + case EXECUTES_AFTER: + case MAY_EXECUTE_BEFORE: + timeseries = forKey.byExecuteAt(); + } + CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp; + switch (testTimestamp) + { + default: throw new AssertionError(); + case STARTED_AFTER: + case EXECUTES_AFTER: + remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.AFTER; + break; + case STARTED_BEFORE: + case MAY_EXECUTE_BEFORE: + remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE; + } + return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue); + }, accumulate, terminalValue); + + return accumulate; Review Comment: nit, rather than override `accumulate` can we do `return` instead? ```suggestion return mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> { CommandsForKey.CommandTimeseries<?> timeseries; switch (testTimestamp) { default: throw new AssertionError(); case STARTED_AFTER: case STARTED_BEFORE: timeseries = forKey.byId(); break; case EXECUTES_AFTER: case MAY_EXECUTE_BEFORE: timeseries = forKey.byExecuteAt(); } CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp; switch (testTimestamp) { default: throw new AssertionError(); case STARTED_AFTER: case EXECUTES_AFTER: remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.AFTER; break; case STARTED_BEFORE: case MAY_EXECUTE_BEFORE: remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE; } return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue); }, accumulate, terminalValue); ``` ########## src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Comparator; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import accord.api.Agent; +import accord.api.DataStore; +import accord.api.Key; +import accord.api.ProgressLog; +import accord.impl.CommandsForKey; +import accord.impl.CommandsForKeys; +import accord.local.Command; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.NodeTimeService; +import accord.local.PreExecuteContext; +import accord.local.SafeCommandStores; +import accord.local.Status; +import accord.primitives.AbstractKeys; +import accord.primitives.Keys; +import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.Routables; +import accord.primitives.Seekable; +import accord.primitives.Seekables; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; + +public class AccordSafeCommandStore extends SafeCommandStores.AbstractSafeCommandStore +{ + private final AccordCommandStore commandStore; + + public AccordSafeCommandStore(PreExecuteContext context, AccordCommandStore commandStore) + { + super(context); + this.commandStore = commandStore; + } + + @Override + protected Command getIfLoaded(TxnId txnId) + { + return commandStore.commandCache().referenceAndGetIfLoaded(txnId); + } + + @Override + protected CommandsForKey getIfLoaded(RoutableKey key) + { + return commandStore.commandsForKeyCache().referenceAndGetIfLoaded(key); + } + + @Override + public AccordCommandStore commandStore() + { + return commandStore; + } + + @Override + public DataStore dataStore() + { + return commandStore().dataStore(); + } + + @Override + public Agent agent() + { + return commandStore.agent(); + } + + @Override + public ProgressLog progressLog() + { + return commandStore().progressLog(); + } + + @Override + public NodeTimeService time() + { + return commandStore.time(); + } + + @Override + public RangesForEpoch ranges() + { + return commandStore().ranges(); + } + + @Override + public long latestEpoch() + { + return commandStore().time().epoch(); + } + + @Override + protected Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice) + { + // TODO: Seekables + // TODO: efficiency + return ((Keys)keysOrRanges).stream() + .map(this::maybeCommandsForKey) + .filter(Objects::nonNull) + .map(CommandsForKey::max) + .max(Comparator.naturalOrder()) + .orElse(Timestamp.NONE); + } + + public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) + { + switch (keysOrRanges.domain()) { + default: + throw new AssertionError(); + case Key: + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + return keys.stream() + .map(this::commandsForKey) + .map(map) + .reduce(initialValue, reduce); + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue) + { + switch (keysOrRanges.domain()) { + default: + throw new AssertionError(); + case Key: + // TODO: efficiency + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + for (Key key : keys) + { + if (!slice.contains(key)) continue; + CommandsForKey forKey = commandsForKey(key); + accumulate = map.apply(forKey, accumulate); + if (accumulate.equals(terminalValue)) + return accumulate; + } + break; + case Range: + // TODO (required): implement + throw new UnsupportedOperationException(); + } + return accumulate; + } + + @Override + public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue) + { + accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> { + CommandsForKey.CommandTimeseries<?> timeseries; + switch (testTimestamp) + { + default: throw new AssertionError(); + case STARTED_AFTER: + case STARTED_BEFORE: + timeseries = forKey.byId(); + break; + case EXECUTES_AFTER: + case MAY_EXECUTE_BEFORE: + timeseries = forKey.byExecuteAt(); + } + CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp; + switch (testTimestamp) + { + default: throw new AssertionError(); + case STARTED_AFTER: + case EXECUTES_AFTER: + remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.AFTER; + break; + case STARTED_BEFORE: + case MAY_EXECUTE_BEFORE: + remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE; + } + return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue); + }, accumulate, terminalValue); + + return accumulate; + } + + @Override + public void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command) + { + // TODO (required): support ranges + Routables.foldl((Keys)keysOrRanges, slice, (k, v, i) -> { CommandsForKeys.register(this, command, k, slice); return v; }, null); + } + + @Override + public void register(Seekable keyOrRange, Ranges slice, Command command) + { + // TODO (required): support ranges + Key key = (Key) keyOrRange; + if (slice.contains(key)) + CommandsForKeys.register(this, command, key, slice); + } Review Comment: can we push these to accord? ########## src/java/org/apache/cassandra/service/accord/AccordStateCache.java: ########## @@ -51,71 +55,107 @@ { private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class); - private static class WriteOnlyGroup<K, V extends AccordState<K>> + public interface LoadFunction<K, V> { - private boolean locked = false; - private List<AccordState.WriteOnly<K, V>> items = new ArrayList<>(); + AsyncResult<Void> apply(K key, Consumer<V> valueConsumer); Review Comment: in the spirit of chains vs result, I think this should be ``` AsyncChain<V> apply(K key); ``` Looking at `PendingLoad` it looks like we are bad about handling the exception case... For example look at `org.apache.cassandra.service.accord.AccordStateCache.Node#maybeFinishLoad` it just sets the `state = null` when we see an error! This puts us in a confusing state where `isLoaded() == true`, but we are not loaded! Which I guess is why `value()` has to have this check ``` Invariants.checkState(isLoaded() && state != null); ``` I tried working on a patch to clean this up and found issues with `AsyncResult` (need `getCauseNow`, `getNow` methods), and `AsyncResults` (`reduce` doesn't handle types properly, causing `List<AsyncChain<?>>` to not work) For `accord.utils.async.AsyncResults#reduce` it should be ``` <T> AsyncChain<V> reduce(Collection<? extends AsyncChain<? extends V>> results, BinaryOperator<V> reducer) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

