dcapwell commented on code in PR #10:
URL: https://github.com/apache/cassandra-accord/pull/10#discussion_r969941939
##########
accord-core/src/main/java/accord/coordinate/Execute.java:
##########
@@ -70,6 +71,12 @@ private void start()
{
Set<Id> readSet = readTracker.computeMinimalReadSetAndMarkInflight();
Commit.commitAndRead(node, topologies, txnId, txn, homeKey, executeAt,
deps, readSet, this);
+ // skip straight to persistence if there's no read
+ if (txn.read().keys().isEmpty())
Review Comment:
We are not good with nullability in Accord, so its not clear if read is
allowed to be null... Based off Caleb's branch
org.apache.cassandra.cql3.statements.TransactionStatement#createRead won't be
null in a write only txn, but is null allowed or not? If so then we need a null
check at "if (txn.read().keys().isEmpty())"
##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -356,42 +315,110 @@ public synchronized void shutdown()
commandStore.shutdown();
}
- protected abstract <S> void forEach(Select<S> select, S scope, long
minEpoch, long maxEpoch, Consumer<? super CommandStore> forEach);
- protected abstract <S, T> T mapReduce(Select<S> select, S scope, long
minEpoch, long maxEpoch, Function<? super CommandStore, T> map, BiFunction<T,
T, T> reduce);
+ private static <T> Fold<TxnOperation, Void, List<Future<T>>>
mapReduceFold(Function<CommandStore, T> map)
+ {
+ return (store, op, i, t) -> { t.add(store.process(op, map)); return t;
};
+ }
+
+ private static <T> T reduce(List<Future<T>> futures, BiFunction<T, T, T>
reduce)
+ {
+ T result = null;
+ for (Future<T> future : futures)
+ {
+ try
+ {
+ T next = future.get();
+ result = result == null ? next : reduce.apply(result, next);
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ return result;
+ }
+
+ private <F, T> T setup(F f, Fold<F, Void, List<Future<T>>> fold,
BiFunction<T, T, T> reduce)
+ {
+ List<Future<T>> futures = foldl((s, i, mn, mx) -> s.all(), null,
Long.MIN_VALUE, Long.MAX_VALUE, fold, f, null, ArrayList::new);
+ return reduce(futures, reduce);
+ }
+
+ private <S, T> T mapReduce(TxnOperation operation, Select<S> select, S
scope, long minEpoch, long maxEpoch, Fold<TxnOperation, Void, List<Future<T>>>
fold, BiFunction<T, T, T> reduce)
+ {
+ List<Future<T>> futures = foldl(select, scope, minEpoch, maxEpoch,
fold, operation, null, ArrayList::new);
+ if (futures == null)
+ return null;
+ return reduce(futures, reduce);
+ }
+
+ public <T> T mapReduce(TxnOperation operation, Key key, long minEpoch,
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ {
+ return mapReduce(operation, ShardedRanges::shard, key, minEpoch,
maxEpoch, mapReduceFold(map), reduce);
+ }
+
+ public <T> T mapReduce(TxnOperation operation, Key key, long epoch,
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ {
+ return mapReduce(operation, key, epoch, epoch, map, reduce);
+ }
+
+ public <T> T mapReduceSince(TxnOperation operation, Key key, long epoch,
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ {
+ return mapReduce(operation, key, epoch, Long.MAX_VALUE, map, reduce);
+ }
- public void forEach(Consumer<CommandStore> forEach)
+ public <T> T mapReduce(TxnOperation operation, Keys keys, long minEpoch,
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
{
- forEach((s, i, min, max) -> s.all(), null, 0, 0, forEach);
+ // probably need to split txnOperation and scope stuff here
+ return mapReduce(operation, ShardedRanges::shards, keys, minEpoch,
maxEpoch, mapReduceFold(map), reduce);
}
- public void forEach(Keys keys, long epoch, Consumer<CommandStore> forEach)
+ public void setup(Consumer<CommandStore> forEach)
{
- forEach(keys, epoch, epoch, forEach);
+ setup(forEach, (store, f, i, t) -> { t.add(store.processSetup(f));
return t; }, (Void i1, Void i2) -> null);
}
- public void forEach(Keys keys, long minEpoch, long maxEpoch,
Consumer<CommandStore> forEach)
+ public <T> T setup(Function<CommandStore, T> map, BiFunction<T, T, T>
reduce)
{
- forEach(ShardedRanges::shards, keys, minEpoch, maxEpoch, forEach);
+ return setup(map, (store, f, i, t) -> { t.add(store.processSetup(f));
return t; }, reduce);
}
- public <T> T mapReduce(Keys keys, long epoch, Function<CommandStore, T>
map, BiFunction<T, T, T> reduce)
+ private static Fold<TxnOperation, Void, List<Future<Void>>>
forEachFold(Consumer<CommandStore> forEach)
{
- return mapReduce(keys, epoch, epoch, map, reduce);
+ return (store, op, i, t) -> { t.add(store.process(op, forEach));
return t; };
}
- public <T> T mapReduce(Keys keys, long minEpoch, long maxEpoch,
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ public void forEach(TxnOperation operation, Keys keys, long minEpoch, long
maxEpoch, Consumer<CommandStore> forEach)
Review Comment:
"forEach" takes both a "TxnOperation" and "Keys", but "TxnOperation" holds
ref to "Keys", so shouldn't we drop "Keys"? Looking at the usage, it seems like
we could...
```
node.forEachLocalSince(TxnOperation.scopeFor(txnId), full.txn.keys(), ...
```
```
TxnOperation scope = TxnOperation.scopeFor(txnId, max.txn.keys());
...
node.forEachLocalSince(scope, max.txn.keys(), ...
```
This change trickled throughout a lot of different places, so would be good
to grasp the expected semantics... if the TxnOperation has keys, and you
provide keys... "should" they be released or "must" they be related? or
something else?
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
Review Comment:
I still need to look closer at our epoch logic but if we have multiple
epochs that are not full acknowledged then wouldn't forEpochCommands have the
wrong min epoch? This is only used in tests and not in the C* patch... if this
is only for tests then I am cool with this, but it would be good to start
annotating what methods are not meant to be used in `src/main`
Same comment for forCommittedInEpoch
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
+ {
+ processInternal(cs -> {
+ consumer.accept(cs);
+ return null;
+ }, promise);
+ }
+
+ protected <T> void processInternal(Function<? super CommandStore, T>
function, Promise<T> promise)
+ {
+ try
+ {
+ T result = function.apply(this);
+ promise.setSuccess(result);
+ }
+ catch (Throwable e)
+ {
+ promise.tryFailure(e);
+ }
+ }
+
+ public static class Synchronized extends InMemoryCommandStore
+ {
+ public Synchronized(int generation,
+ int index,
+ int numShards,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public synchronized Future<Void> processSetup(Consumer<? super
CommandStore> function)
+ {
+ AsyncPromise<Void> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> processSetup(Function<? super
CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized Future<Void> process(TxnOperation unused,
Consumer<? super CommandStore> consumer)
+ {
+ Promise<Void> promise = new AsyncPromise<>();
+ processInternal(consumer, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> process(TxnOperation unused,
Function<? super CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized void shutdown() {}
+ }
+
+ public static class SingleThread extends InMemoryCommandStore
+ {
+ private final ExecutorService executor;
+
+ private class ConsumerWrapper extends AsyncPromise<Void> implements
Runnable
+ {
+ private final Consumer<? super CommandStore> consumer;
+
+ public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(consumer, this);
+ }
+ }
+
+ private class FunctionWrapper<T> extends AsyncPromise<T> implements
Runnable
+ {
+ private final Function<? super CommandStore, T> function;
+
+ public FunctionWrapper(Function<? super CommandStore, T> function)
+ {
+ this.function = function;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(function, this);
+ }
+ }
+
+ public SingleThread(int generation,
+ int index,
+ int numShards,
+ Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ executor = Executors.newSingleThreadExecutor(r -> {
Review Comment:
SingleThread should expose ability to control thread name... in C* we have a
"global prefix" for jvm-dtest
##########
accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandsForKey;
+import accord.primitives.Timestamp;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
+public class InMemoryCommandsForKey extends CommandsForKey
+{
+ static class InMemoryCommandTimeseries implements CommandTimeseries
+ {
+ private final NavigableMap<Timestamp, Command> commands = new
TreeMap<>();
+
+ @Override
+ public Command get(Timestamp timestamp)
+ {
+ return commands.get(timestamp);
+ }
+
+ @Override
+ public void add(Timestamp timestamp, Command command)
Review Comment:
should check to see if a conflict was detected
##########
accord-core/build.gradle:
##########
@@ -29,11 +29,13 @@ java {
withSourcesJar()
}
+compileJava {
Review Comment:
can you move the 1.8 logic to
buildSrc/src/main/groovy/accord.java-conventions.gradle
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
Review Comment:
Reply from Blake in JIRA
I'd tried that, but you also need to pass in the min and max timestamps,
which makes 3 arguments so you need a special interface, and the duplicated
code isn't too complex, so I figured it was a net negative for readability in
exchange for a few less lines of code
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
+ {
+ processInternal(cs -> {
+ consumer.accept(cs);
+ return null;
+ }, promise);
+ }
+
+ protected <T> void processInternal(Function<? super CommandStore, T>
function, Promise<T> promise)
+ {
+ try
+ {
+ T result = function.apply(this);
+ promise.setSuccess(result);
+ }
+ catch (Throwable e)
+ {
+ promise.tryFailure(e);
+ }
+ }
+
+ public static class Synchronized extends InMemoryCommandStore
+ {
+ public Synchronized(int generation,
+ int index,
+ int numShards,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public synchronized Future<Void> processSetup(Consumer<? super
CommandStore> function)
+ {
+ AsyncPromise<Void> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> processSetup(Function<? super
CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized Future<Void> process(TxnOperation unused,
Consumer<? super CommandStore> consumer)
+ {
+ Promise<Void> promise = new AsyncPromise<>();
+ processInternal(consumer, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> process(TxnOperation unused,
Function<? super CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized void shutdown() {}
+ }
+
+ public static class SingleThread extends InMemoryCommandStore
+ {
+ private final ExecutorService executor;
+
+ private class ConsumerWrapper extends AsyncPromise<Void> implements
Runnable
+ {
+ private final Consumer<? super CommandStore> consumer;
+
+ public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(consumer, this);
+ }
+ }
+
+ private class FunctionWrapper<T> extends AsyncPromise<T> implements
Runnable
+ {
+ private final Function<? super CommandStore, T> function;
+
+ public FunctionWrapper(Function<? super CommandStore, T> function)
+ {
+ this.function = function;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(function, this);
+ }
+ }
+
+ public SingleThread(int generation,
+ int index,
+ int numShards,
+ Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ executor = Executors.newSingleThreadExecutor(r -> {
Review Comment:
Reply from Blake in JIRA
This is just for testing, I don't think we need anything more sophisticated
here unless it's serving an immediate purpose.
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStores.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.ProgressLog;
+import accord.local.CommandStore;
+import accord.local.CommandStores;
+import accord.local.Node;
+import accord.primitives.Keys;
+
+import java.util.function.Consumer;
+
+import static java.lang.Boolean.FALSE;
+
+public abstract class InMemoryCommandStores extends CommandStores
+{
+ public InMemoryCommandStores(int num, Node node, Agent agent, DataStore
store,
+ ProgressLog.Factory progressLogFactory)
+ {
+ super(num, node, agent, store, progressLogFactory);
+ }
+
+ public static InMemoryCommandStores inMemory(Node node)
+ {
+ return (InMemoryCommandStores) node.commandStores();
+ }
+
+ public void forEachLocal(Consumer<? super CommandStore> forEach)
Review Comment:
Reply from Blake in JIRA
these are only there for testing and assume in-memory implementation details
(since none of them require a TxnScope)
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
Review Comment:
Nit: forCommittedInEpoch and forEpochCommands are mostly copy/paste of each
other, can we create a new util function such as
```
public void forEach(KeyRanges ranges, long epoch, Consumer<CommandsForKey>
consumer)
{
Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
for (KeyRange range : ranges)
{
Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
range.startInclusive(),
range.end(),
range.endInclusive()).values();
rangeCommands.forEach(consumer);
}
}
```
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -274,99 +289,110 @@ public boolean commit(Txn txn, Key homeKey, Key
progressKey, Timestamp executeAt
});
}
- if (waitingOnCommit.isEmpty())
- {
- waitingOnCommit = null;
- if (waitingOnApply.isEmpty())
- waitingOnApply = null;
- }
- boolean isProgressShard = progressKey != null && handles(txnId.epoch,
progressKey);
- commandStore.progressLog().commit(txnId, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
+ boolean isProgressShard = progressKey != null &&
handles(txnId().epoch, progressKey);
+ commandStore().progressLog().commit(txnId(), isProgressShard,
isProgressShard && progressKey.equals(homeKey));
- maybeExecute(false);
- listeners.forEach(this);
return true;
}
+ public Future<Void> commitAndBeginExecution(Txn txn, Key homeKey, Key
progressKey, Timestamp executeAt, Deps deps)
+ {
+ if (!commit(txn, homeKey, progressKey, executeAt, deps))
+ return Write.SUCCESS;
+
+ return maybeExecute(true);
+ }
+
+ // TODO (now): commitInvalidate may need to update cfks _if_ possible
public boolean commitInvalidate()
{
if (hasBeen(Committed))
{
+ logger.trace("{}: skipping commit invalidated - already committed
({})", txnId(), status());
if (!hasBeen(Invalidated))
- commandStore.agent().onInconsistentTimestamp(this,
Timestamp.NONE, executeAt);
+ commandStore().agent().onInconsistentTimestamp(this,
Timestamp.NONE, executeAt());
return false;
}
- status = Invalidated;
+ status(Invalidated);
- boolean isProgressShard = progressKey != null && handles(txnId.epoch,
progressKey);
- commandStore.progressLog().invalidate(txnId, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
+ boolean isProgressShard = progressKey() != null &&
handles(txnId().epoch, progressKey());
+ commandStore().progressLog().invalidate(txnId(), isProgressShard,
isProgressShard && progressKey().equals(homeKey()));
+ logger.trace("{}: committed invalidated", txnId());
- listeners.forEach(this);
+ notifyListeners();
return true;
}
- public boolean apply(Txn txn, Key homeKey, Key progressKey, Timestamp
executeAt, Deps deps, Writes writes, Result result)
+ public Future<Void> apply(Txn txn, Key homeKey, Key progressKey, Timestamp
executeAt, Deps deps, Writes writes, Result result)
Review Comment:
Reply from Blake in JIRA
So I don't think your example timeline is possible, since Command methods
like commit are idempotent and the command store is single threaded. T2 would
be a noop. However, I don't think there's anything to prevent maybeExecute to
fire off duplicate writes in the time between Command#apply dispatches writes
and the time Command#postApply sets the status to Applied in
Command#postApply., and the same goes for Command#read. I'm going to think
about this a bit, since it depends on the concurrency of the implementation I'm
not sure if it would be better to handle this in the library, or in the
implementation....
##########
accord-core/src/main/java/accord/impl/InMemoryCommand.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.api.Result;
+import accord.local.*;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.txn.*;
+
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import static accord.local.Status.NotWitnessed;
+
+public class InMemoryCommand extends Command
+{
+ public final CommandStore commandStore;
+ private final TxnId txnId;
+
+ private Key homeKey, progressKey;
+ private Txn txn;
+ private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
+ private Timestamp executeAt;
+ private Deps deps = Deps.NONE;
+ private Writes writes;
+ private Result result;
+
+ private Status status = NotWitnessed;
+
+ private boolean isGloballyPersistent; // only set on home shard
+
+ private NavigableMap<TxnId, Command> waitingOnCommit;
+ private NavigableMap<TxnId, Command> waitingOnApply;
+
+ private final Listeners listeners = new Listeners();
+
+ public InMemoryCommand(CommandStore commandStore, TxnId txnId)
+ {
+ this.commandStore = commandStore;
+ this.txnId = txnId;
+ }
+
+ @Override
+ public boolean equals(Object o)
Review Comment:
eq is missing homeKey, progressKey, isGloballyPersistent, is this
intentional?
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
Review Comment:
nit: "protected void processInternal(Consumer<? super CommandStore>
consumer, Promise<Void> promise)" could be the following to avoid copy/paste
```
protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
{
processInternal(cs -> {
consumer.accept(cs);
return null;
}, promise);
}
```
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
+ {
+ processInternal(cs -> {
+ consumer.accept(cs);
+ return null;
+ }, promise);
+ }
+
+ protected <T> void processInternal(Function<? super CommandStore, T>
function, Promise<T> promise)
+ {
+ try
+ {
+ T result = function.apply(this);
+ promise.setSuccess(result);
+ }
+ catch (Throwable e)
+ {
+ promise.tryFailure(e);
+ }
+ }
+
+ public static class Synchronized extends InMemoryCommandStore
Review Comment:
Synchronized doesn't use "synchronized" in every method... so name feels
off...
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
+ {
+ processInternal(cs -> {
+ consumer.accept(cs);
+ return null;
+ }, promise);
+ }
+
+ protected <T> void processInternal(Function<? super CommandStore, T>
function, Promise<T> promise)
+ {
+ try
+ {
+ T result = function.apply(this);
+ promise.setSuccess(result);
+ }
+ catch (Throwable e)
+ {
+ promise.tryFailure(e);
+ }
+ }
+
+ public static class Synchronized extends InMemoryCommandStore
+ {
+ public Synchronized(int generation,
+ int index,
+ int numShards,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public synchronized Future<Void> processSetup(Consumer<? super
CommandStore> function)
+ {
+ AsyncPromise<Void> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> processSetup(Function<? super
CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized Future<Void> process(TxnOperation unused,
Consumer<? super CommandStore> consumer)
+ {
+ Promise<Void> promise = new AsyncPromise<>();
+ processInternal(consumer, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> process(TxnOperation unused,
Function<? super CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized void shutdown() {}
+ }
+
+ public static class SingleThread extends InMemoryCommandStore
+ {
+ private final ExecutorService executor;
+
+ private class ConsumerWrapper extends AsyncPromise<Void> implements
Runnable
+ {
+ private final Consumer<? super CommandStore> consumer;
+
+ public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(consumer, this);
+ }
+ }
+
+ private class FunctionWrapper<T> extends AsyncPromise<T> implements
Runnable
+ {
+ private final Function<? super CommandStore, T> function;
+
+ public FunctionWrapper(Function<? super CommandStore, T> function)
+ {
+ this.function = function;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(function, this);
+ }
+ }
+
+ public SingleThread(int generation,
+ int index,
+ int numShards,
+ Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ executor = Executors.newSingleThreadExecutor(r -> {
+ Thread thread = new Thread(r);
+ thread.setName(CommandStore.class.getSimpleName() + '[' +
nodeId + ':' + index + ']');
+ return thread;
+ });
+ }
+
+ @Override
+ public Future<Void> processSetup(Consumer<? super CommandStore>
function)
+ {
+ ConsumerWrapper future = new ConsumerWrapper(function);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public <T> Future<T> processSetup(Function<? super CommandStore, T>
function)
+ {
+ FunctionWrapper<T> future = new FunctionWrapper<>(function);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public Future<Void> process(TxnOperation unused, Consumer<? super
CommandStore> consumer)
+ {
+ ConsumerWrapper future = new ConsumerWrapper(consumer);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public <T> Future<T> process(TxnOperation unused, Function<? super
CommandStore, T> function)
+ {
+ FunctionWrapper<T> future = new FunctionWrapper<>(function);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ executor.shutdown();
+ }
+ }
+
+ public static class SingleThreadDebug extends SingleThread
+ {
+ private final AtomicReference<Thread> expectedThread = new
AtomicReference<>();
+
+ public SingleThreadDebug(int generation,
+ int index,
+ int numShards,
+ Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, nodeId, uniqueNow,
currentEpoch, agent, store, progressLogFactory, rangesForEpoch);
+ }
+
+ private void assertThread()
+ {
+ Thread current = Thread.currentThread();
+ Thread expected;
+ while (true)
+ {
+ expected = expectedThread.get();
+ if (expected != null)
+ break;
+ expectedThread.compareAndSet(null, Thread.currentThread());
+ }
+ if (expected != current)
+ throw new IllegalStateException(String.format("Command store
called from the wrong thread. Expected %s, got %s", expected, current));
Review Comment:
nit: can we add a useful message? When something like this fails its too
hard to figure out what's going on, so we should be nice and provide a useful
error.
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,129 +18,117 @@
package accord.local;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.function.Consumer;
-
-import com.google.common.base.Preconditions;
-
import accord.api.Key;
+import accord.api.Read;
import accord.api.Result;
+import accord.api.Write;
import accord.local.Node.Id;
-import accord.primitives.KeyRanges;
-import accord.primitives.Ballot;
-import accord.primitives.Deps;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
+import accord.primitives.*;
import accord.txn.Txn;
-import accord.primitives.TxnId;
import accord.txn.Writes;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static accord.local.Status.*;
+import static accord.utils.Utils.listOf;
-import static accord.local.Status.Accepted;
-import static accord.local.Status.AcceptedInvalidate;
-import static accord.local.Status.Applied;
-import static accord.local.Status.Committed;
-import static accord.local.Status.Executed;
-import static accord.local.Status.Invalidated;
-import static accord.local.Status.NotWitnessed;
-import static accord.local.Status.PreAccepted;
-import static accord.local.Status.ReadyToExecute;
-
-// TODO: this needs to be backed by persistent storage
-public class Command implements Listener, Consumer<Listener>
+public abstract class Command implements Listener, Consumer<Listener>,
TxnOperation
Review Comment:
nit: given the refactor to make all state abstract wouldn't it be best if
this became an Interface?
##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -812,18 +812,26 @@ public void onCallbackFailure(Id from, Throwable failure)
public static class ApplyAndCheck extends Apply
{
- final Set<Id> notPersisted;
+ public final Set<Id> notPersisted;
Review Comment:
For consistency shouldn't notPersisted be private with a getter? this patch
switches many things to getters yet makes this public field access.
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStores.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.ProgressLog;
+import accord.local.CommandStore;
+import accord.local.CommandStores;
+import accord.local.Node;
+import accord.primitives.Keys;
+
+import java.util.function.Consumer;
+
+import static java.lang.Boolean.FALSE;
+
+public abstract class InMemoryCommandStores extends CommandStores
+{
+ public InMemoryCommandStores(int num, Node node, Agent agent, DataStore
store,
+ ProgressLog.Factory progressLogFactory)
+ {
+ super(num, node, agent, store, progressLogFactory);
+ }
+
+ public static InMemoryCommandStores inMemory(Node node)
+ {
+ return (InMemoryCommandStores) node.commandStores();
+ }
+
+ public void forEachLocal(Consumer<? super CommandStore> forEach)
Review Comment:
It would be nice to move "forEachLocal" to lower level... seems that we try
to overload fold in cases where forEach would be simpler.
##########
accord-maelstrom/build.gradle:
##########
@@ -20,8 +20,13 @@ plugins {
id 'accord.java-conventions'
}
+compileJava {
Review Comment:
can you move the 1.8 logic to
buildSrc/src/main/groovy/accord.java-conventions.gradle
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
Review Comment:
Reply from Blake in JIRA
No I don't think so, these 2 commands just return witnessed/committed
commands belonging to the given epoch. These (and all InMemory*
implementations) are only meant to be used in tests. These methods in
particular are placeholders to range movements in the library can be validated
and should be superseded by the work to "officially" support bootstrap, low
bounds, etc.
##########
accord-core/src/main/java/accord/coordinate/Execute.java:
##########
@@ -70,6 +71,12 @@ private void start()
{
Set<Id> readSet = readTracker.computeMinimalReadSetAndMarkInflight();
Commit.commitAndRead(node, topologies, txnId, txn, homeKey, executeAt,
deps, readSet, this);
+ // skip straight to persistence if there's no read
+ if (txn.read().keys().isEmpty())
Review Comment:
Reply from Blake in JIRA
I've added some annotations based on how things are implemented now
##########
accord-core/src/main/java/accord/impl/InMemoryCommand.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.api.Result;
+import accord.local.*;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.txn.*;
+
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import static accord.local.Status.NotWitnessed;
+
+public class InMemoryCommand extends Command
+{
+ public final CommandStore commandStore;
+ private final TxnId txnId;
+
+ private Key homeKey, progressKey;
+ private Txn txn;
+ private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
+ private Timestamp executeAt;
+ private Deps deps = Deps.NONE;
+ private Writes writes;
+ private Result result;
+
+ private Status status = NotWitnessed;
+
+ private boolean isGloballyPersistent; // only set on home shard
+
+ private NavigableMap<TxnId, Command> waitingOnCommit;
+ private NavigableMap<TxnId, Command> waitingOnApply;
+
+ private final Listeners listeners = new Listeners();
+
+ public InMemoryCommand(CommandStore commandStore, TxnId txnId)
+ {
+ this.commandStore = commandStore;
+ this.txnId = txnId;
+ }
+
+ @Override
+ public boolean equals(Object o)
Review Comment:
This was fixed
##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -356,42 +315,110 @@ public synchronized void shutdown()
commandStore.shutdown();
}
- protected abstract <S> void forEach(Select<S> select, S scope, long
minEpoch, long maxEpoch, Consumer<? super CommandStore> forEach);
- protected abstract <S, T> T mapReduce(Select<S> select, S scope, long
minEpoch, long maxEpoch, Function<? super CommandStore, T> map, BiFunction<T,
T, T> reduce);
+ private static <T> Fold<TxnOperation, Void, List<Future<T>>>
mapReduceFold(Function<CommandStore, T> map)
+ {
+ return (store, op, i, t) -> { t.add(store.process(op, map)); return t;
};
+ }
+
+ private static <T> T reduce(List<Future<T>> futures, BiFunction<T, T, T>
reduce)
+ {
+ T result = null;
+ for (Future<T> future : futures)
+ {
+ try
+ {
+ T next = future.get();
+ result = result == null ? next : reduce.apply(result, next);
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ return result;
+ }
+
+ private <F, T> T setup(F f, Fold<F, Void, List<Future<T>>> fold,
BiFunction<T, T, T> reduce)
+ {
+ List<Future<T>> futures = foldl((s, i, mn, mx) -> s.all(), null,
Long.MIN_VALUE, Long.MAX_VALUE, fold, f, null, ArrayList::new);
+ return reduce(futures, reduce);
+ }
+
+ private <S, T> T mapReduce(TxnOperation operation, Select<S> select, S
scope, long minEpoch, long maxEpoch, Fold<TxnOperation, Void, List<Future<T>>>
fold, BiFunction<T, T, T> reduce)
+ {
+ List<Future<T>> futures = foldl(select, scope, minEpoch, maxEpoch,
fold, operation, null, ArrayList::new);
+ if (futures == null)
+ return null;
+ return reduce(futures, reduce);
+ }
+
+ public <T> T mapReduce(TxnOperation operation, Key key, long minEpoch,
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ {
+ return mapReduce(operation, ShardedRanges::shard, key, minEpoch,
maxEpoch, mapReduceFold(map), reduce);
+ }
+
+ public <T> T mapReduce(TxnOperation operation, Key key, long epoch,
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ {
+ return mapReduce(operation, key, epoch, epoch, map, reduce);
+ }
+
+ public <T> T mapReduceSince(TxnOperation operation, Key key, long epoch,
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ {
+ return mapReduce(operation, key, epoch, Long.MAX_VALUE, map, reduce);
+ }
- public void forEach(Consumer<CommandStore> forEach)
+ public <T> T mapReduce(TxnOperation operation, Keys keys, long minEpoch,
long maxEpoch, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
{
- forEach((s, i, min, max) -> s.all(), null, 0, 0, forEach);
+ // probably need to split txnOperation and scope stuff here
+ return mapReduce(operation, ShardedRanges::shards, keys, minEpoch,
maxEpoch, mapReduceFold(map), reduce);
}
- public void forEach(Keys keys, long epoch, Consumer<CommandStore> forEach)
+ public void setup(Consumer<CommandStore> forEach)
{
- forEach(keys, epoch, epoch, forEach);
+ setup(forEach, (store, f, i, t) -> { t.add(store.processSetup(f));
return t; }, (Void i1, Void i2) -> null);
}
- public void forEach(Keys keys, long minEpoch, long maxEpoch,
Consumer<CommandStore> forEach)
+ public <T> T setup(Function<CommandStore, T> map, BiFunction<T, T, T>
reduce)
{
- forEach(ShardedRanges::shards, keys, minEpoch, maxEpoch, forEach);
+ return setup(map, (store, f, i, t) -> { t.add(store.processSetup(f));
return t; }, reduce);
}
- public <T> T mapReduce(Keys keys, long epoch, Function<CommandStore, T>
map, BiFunction<T, T, T> reduce)
+ private static Fold<TxnOperation, Void, List<Future<Void>>>
forEachFold(Consumer<CommandStore> forEach)
{
- return mapReduce(keys, epoch, epoch, map, reduce);
+ return (store, op, i, t) -> { t.add(store.process(op, forEach));
return t; };
}
- public <T> T mapReduce(Keys keys, long minEpoch, long maxEpoch,
Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
+ public void forEach(TxnOperation operation, Keys keys, long minEpoch, long
maxEpoch, Consumer<CommandStore> forEach)
Review Comment:
Reply from Blake in JIRA
The keys in each perform different functions. The keys in the TxnOperation
indicate which CommandsForKey we need access to, this let's C* know which ones
it needs to load. The Keys passed into forEach determine which command stores
we need to talk to, since we need to talk to "replicas" of the given keys,
whether we need the commands for key to be in memory or not. Of course, this
isn't obvious without more complete comments on TxnOperation
> This change trickled throughout a lot of different places, so would be
good to grasp the expected semantics... if the TxnOperation has keys, and you
provide keys... "should" they be released or "must" they be related? or
something else?
I added some comments. The name TxnOperation is a little unclear. Let me
know if you have any suggestions for better names. Maybe TxnOperationScope?
Maybe it could extend Function<CommandStore, T> and bundle the operation and
the metadata into the same interface? Calls would be a bit more cumbersome, but
would make the interdependency of the 3 parts more obvious
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
+ {
+ processInternal(cs -> {
+ consumer.accept(cs);
+ return null;
+ }, promise);
+ }
+
+ protected <T> void processInternal(Function<? super CommandStore, T>
function, Promise<T> promise)
+ {
+ try
+ {
+ T result = function.apply(this);
+ promise.setSuccess(result);
+ }
+ catch (Throwable e)
+ {
+ promise.tryFailure(e);
+ }
+ }
+
+ public static class Synchronized extends InMemoryCommandStore
Review Comment:
resolved
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -274,99 +289,110 @@ public boolean commit(Txn txn, Key homeKey, Key
progressKey, Timestamp executeAt
});
}
- if (waitingOnCommit.isEmpty())
- {
- waitingOnCommit = null;
- if (waitingOnApply.isEmpty())
- waitingOnApply = null;
- }
- boolean isProgressShard = progressKey != null && handles(txnId.epoch,
progressKey);
- commandStore.progressLog().commit(txnId, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
+ boolean isProgressShard = progressKey != null &&
handles(txnId().epoch, progressKey);
+ commandStore().progressLog().commit(txnId(), isProgressShard,
isProgressShard && progressKey.equals(homeKey));
- maybeExecute(false);
- listeners.forEach(this);
return true;
}
+ public Future<Void> commitAndBeginExecution(Txn txn, Key homeKey, Key
progressKey, Timestamp executeAt, Deps deps)
+ {
+ if (!commit(txn, homeKey, progressKey, executeAt, deps))
+ return Write.SUCCESS;
+
+ return maybeExecute(true);
+ }
+
+ // TODO (now): commitInvalidate may need to update cfks _if_ possible
public boolean commitInvalidate()
{
if (hasBeen(Committed))
{
+ logger.trace("{}: skipping commit invalidated - already committed
({})", txnId(), status());
if (!hasBeen(Invalidated))
- commandStore.agent().onInconsistentTimestamp(this,
Timestamp.NONE, executeAt);
+ commandStore().agent().onInconsistentTimestamp(this,
Timestamp.NONE, executeAt());
return false;
}
- status = Invalidated;
+ status(Invalidated);
- boolean isProgressShard = progressKey != null && handles(txnId.epoch,
progressKey);
- commandStore.progressLog().invalidate(txnId, isProgressShard,
isProgressShard && progressKey.equals(homeKey));
+ boolean isProgressShard = progressKey() != null &&
handles(txnId().epoch, progressKey());
+ commandStore().progressLog().invalidate(txnId(), isProgressShard,
isProgressShard && progressKey().equals(homeKey()));
+ logger.trace("{}: committed invalidated", txnId());
- listeners.forEach(this);
+ notifyListeners();
return true;
}
- public boolean apply(Txn txn, Key homeKey, Key progressKey, Timestamp
executeAt, Deps deps, Writes writes, Result result)
+ public Future<Void> apply(Txn txn, Key homeKey, Key progressKey, Timestamp
executeAt, Deps deps, Writes writes, Result result)
Review Comment:
apply now returns a Future but some code paths ignore the output, so if
apply fails or is async, there may be unexpected results? Examples:
accord.coordinate.CheckOnCommitted#onSuccessCriteriaOrExhaustion(accord.messages.CheckStatus.CheckStatusOkFull),
and accord.impl.SimpleProgressLog.ApplyAndCheck#process
If apply is still being "applied" then the call to "commit" may cause a race
condition as it could be called while the writes are happening, leading to the
following timeline:
```
T1 apply -> sets status to "Executed", send writes to be written in the
background
T2 commit -> set status to "Committed"
T3 apply complete, call "postApply" -> set status "Applied"
```
specifically this comment applies to
"accord.coordinate.CheckOnCommitted#onSuccessCriteriaOrExhaustion(accord.messages.CheckStatus.CheckStatusOkFull)"...
I might not be grasping the "epoch - 1" part
##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,129 +18,117 @@
package accord.local;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.function.Consumer;
-
-import com.google.common.base.Preconditions;
-
import accord.api.Key;
+import accord.api.Read;
import accord.api.Result;
+import accord.api.Write;
import accord.local.Node.Id;
-import accord.primitives.KeyRanges;
-import accord.primitives.Ballot;
-import accord.primitives.Deps;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
+import accord.primitives.*;
import accord.txn.Txn;
-import accord.primitives.TxnId;
import accord.txn.Writes;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static accord.local.Status.*;
+import static accord.utils.Utils.listOf;
-import static accord.local.Status.Accepted;
-import static accord.local.Status.AcceptedInvalidate;
-import static accord.local.Status.Applied;
-import static accord.local.Status.Committed;
-import static accord.local.Status.Executed;
-import static accord.local.Status.Invalidated;
-import static accord.local.Status.NotWitnessed;
-import static accord.local.Status.PreAccepted;
-import static accord.local.Status.ReadyToExecute;
-
-// TODO: this needs to be backed by persistent storage
-public class Command implements Listener, Consumer<Listener>
+public abstract class Command implements Listener, Consumer<Listener>,
TxnOperation
Review Comment:
Reply from Blake in JIRA
I don't think so. There's a lot of functionality in there, some of which
needs to be overridden in the C* implementation
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
Review Comment:
resolved
##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -812,18 +812,26 @@ public void onCallbackFailure(Id from, Throwable failure)
public static class ApplyAndCheck extends Apply
{
- final Set<Id> notPersisted;
+ public final Set<Id> notPersisted;
Review Comment:
Reply from Blake in JIRA
The integration of the progress log hasn't started yet. I'd expect the
interface and reference implementation to be refined as that happens, but the
objective for this patch to do the bare minimum to prevent build issues and
test failures.
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandsForKey;
+import accord.local.Node;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.local.*;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
+
+public abstract class InMemoryCommandStore extends CommandStore
+{
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new
TreeMap<>();
+
+ public static InMemoryCommandStore inMemory(CommandStore commandStore)
+ {
+ return (InMemoryCommandStore) commandStore;
+ }
+
+ public InMemoryCommandStore(int generation, int index, int numShards,
Function<Timestamp, Timestamp> uniqueNow, LongSupplier currentEpoch, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpoch
rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch, agent,
store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public Command ifPresent(TxnId txnId)
+ {
+ return commands.get(txnId);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new InMemoryCommand(this,
id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key,
InMemoryCommandsForKey::new);
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ @Override
+ public CommandsForKey maybeCommandsForKey(Key key)
+ {
+ return commandsForKey.get(key);
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forEpochCommands(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+ commands.forWitnessed(minTimestamp, maxTimestamp, consumer);
+ }
+ }
+ }
+
+ // TODO: command store api will need to support something like this for
repair/streaming
+ public void forCommittedInEpoch(KeyRanges ranges, long epoch,
Consumer<Command> consumer)
+ {
+ Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE,
Integer.MIN_VALUE, Node.Id.NONE);
+ Timestamp maxTimestamp = new Timestamp(epoch, Long.MAX_VALUE,
Integer.MAX_VALUE, Node.Id.MAX);
+ for (KeyRange range : ranges)
+ {
+ Iterable<CommandsForKey> rangeCommands =
commandsForKey.subMap(range.start(),
+
range.startInclusive(),
+
range.end(),
+
range.endInclusive()).values();
+ for (CommandsForKey commands : rangeCommands)
+ {
+
+ Collection<Command> committed = commands.committedByExecuteAt()
+ .between(minTimestamp,
maxTimestamp).collect(Collectors.toList());
+ committed.forEach(consumer);
+ }
+ }
+ }
+
+ protected void processInternal(Consumer<? super CommandStore> consumer,
Promise<Void> promise)
+ {
+ processInternal(cs -> {
+ consumer.accept(cs);
+ return null;
+ }, promise);
+ }
+
+ protected <T> void processInternal(Function<? super CommandStore, T>
function, Promise<T> promise)
+ {
+ try
+ {
+ T result = function.apply(this);
+ promise.setSuccess(result);
+ }
+ catch (Throwable e)
+ {
+ promise.tryFailure(e);
+ }
+ }
+
+ public static class Synchronized extends InMemoryCommandStore
+ {
+ public Synchronized(int generation,
+ int index,
+ int numShards,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ }
+
+ @Override
+ public synchronized Future<Void> processSetup(Consumer<? super
CommandStore> function)
+ {
+ AsyncPromise<Void> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> processSetup(Function<? super
CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized Future<Void> process(TxnOperation unused,
Consumer<? super CommandStore> consumer)
+ {
+ Promise<Void> promise = new AsyncPromise<>();
+ processInternal(consumer, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized <T> Future<T> process(TxnOperation unused,
Function<? super CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
+ @Override
+ public synchronized void shutdown() {}
+ }
+
+ public static class SingleThread extends InMemoryCommandStore
+ {
+ private final ExecutorService executor;
+
+ private class ConsumerWrapper extends AsyncPromise<Void> implements
Runnable
+ {
+ private final Consumer<? super CommandStore> consumer;
+
+ public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(consumer, this);
+ }
+ }
+
+ private class FunctionWrapper<T> extends AsyncPromise<T> implements
Runnable
+ {
+ private final Function<? super CommandStore, T> function;
+
+ public FunctionWrapper(Function<? super CommandStore, T> function)
+ {
+ this.function = function;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(function, this);
+ }
+ }
+
+ public SingleThread(int generation,
+ int index,
+ int numShards,
+ Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, uniqueNow, currentEpoch,
agent, store, progressLogFactory, rangesForEpoch);
+ executor = Executors.newSingleThreadExecutor(r -> {
+ Thread thread = new Thread(r);
+ thread.setName(CommandStore.class.getSimpleName() + '[' +
nodeId + ':' + index + ']');
+ return thread;
+ });
+ }
+
+ @Override
+ public Future<Void> processSetup(Consumer<? super CommandStore>
function)
+ {
+ ConsumerWrapper future = new ConsumerWrapper(function);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public <T> Future<T> processSetup(Function<? super CommandStore, T>
function)
+ {
+ FunctionWrapper<T> future = new FunctionWrapper<>(function);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public Future<Void> process(TxnOperation unused, Consumer<? super
CommandStore> consumer)
+ {
+ ConsumerWrapper future = new ConsumerWrapper(consumer);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public <T> Future<T> process(TxnOperation unused, Function<? super
CommandStore, T> function)
+ {
+ FunctionWrapper<T> future = new FunctionWrapper<>(function);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ executor.shutdown();
+ }
+ }
+
+ public static class SingleThreadDebug extends SingleThread
+ {
+ private final AtomicReference<Thread> expectedThread = new
AtomicReference<>();
+
+ public SingleThreadDebug(int generation,
+ int index,
+ int numShards,
+ Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ LongSupplier currentEpoch,
+ Agent agent,
+ DataStore store,
+ ProgressLog.Factory progressLogFactory,
+ RangesForEpoch rangesForEpoch)
+ {
+ super(generation, index, numShards, nodeId, uniqueNow,
currentEpoch, agent, store, progressLogFactory, rangesForEpoch);
+ }
+
+ private void assertThread()
+ {
+ Thread current = Thread.currentThread();
+ Thread expected;
+ while (true)
+ {
+ expected = expectedThread.get();
+ if (expected != null)
+ break;
+ expectedThread.compareAndSet(null, Thread.currentThread());
+ }
+ if (expected != current)
+ throw new IllegalStateException(String.format("Command store
called from the wrong thread. Expected %s, got %s", expected, current));
Review Comment:
Resolved
##########
accord-core/src/main/java/accord/messages/Apply.java:
##########
@@ -54,15 +63,59 @@ public Apply(Node.Id to, Topologies topologies, TxnId
txnId, Txn txn, Key homeKe
this.result = result;
}
+ @VisibleForImplementation
+ public Apply(Keys scope, long waitForEpoch, TxnId txnId, Txn txn, Key
homeKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
+ {
+ super(scope, waitForEpoch);
+ this.txnId = txnId;
+ this.txn = txn;
+ this.homeKey = homeKey;
+ this.executeAt = executeAt;
+ this.deps = deps;
+ this.writes = writes;
+ this.result = result;
+ }
+
+ static Future<Void> waitAndReduce(Future<Void> left, Future<Void> right)
Review Comment:
Reply from Blake in JIRA
Good point. Changed return type to Future<Void>
##########
accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.CommandsForKey;
+import accord.primitives.Timestamp;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
+public class InMemoryCommandsForKey extends CommandsForKey
+{
+ static class InMemoryCommandTimeseries implements CommandTimeseries
+ {
+ private final NavigableMap<Timestamp, Command> commands = new
TreeMap<>();
+
+ @Override
+ public Command get(Timestamp timestamp)
+ {
+ return commands.get(timestamp);
+ }
+
+ @Override
+ public void add(Timestamp timestamp, Command command)
Review Comment:
Resolved
##########
accord-core/src/main/java/accord/messages/Apply.java:
##########
@@ -54,15 +63,59 @@ public Apply(Node.Id to, Topologies topologies, TxnId
txnId, Txn txn, Key homeKe
this.result = result;
}
+ @VisibleForImplementation
+ public Apply(Keys scope, long waitForEpoch, TxnId txnId, Txn txn, Key
homeKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
+ {
+ super(scope, waitForEpoch);
+ this.txnId = txnId;
+ this.txn = txn;
+ this.homeKey = homeKey;
+ this.executeAt = executeAt;
+ this.deps = deps;
+ this.writes = writes;
+ this.result = result;
+ }
+
+ static Future<Void> waitAndReduce(Future<Void> left, Future<Void> right)
Review Comment:
nit: "waitAndReduce" throws on future error rather than return a failed
future, this feels off to me but looks to be because "mapReduceLocal" doesn't
allow changing the return type; we want Void in this context but that API
doesn't allow... would it be better to allow a different return type? Caller
never checks return so Future<?> is brittle...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]