dcapwell commented on code in PR #2256:
URL: https://github.com/apache/cassandra/pull/2256#discussion_r1183939906
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -44,6 +44,7 @@
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
+import org.apache.cassandra.journal.AsyncWriteCallback;
Review Comment:
mind reverting this file? with the move to CommandStores this is all just
moving functions around and adding a unused import
##########
test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java:
##########
@@ -215,12 +215,16 @@ public static void
simulate(IIsolatedExecutor.SerializableRunnable[] runnables,
IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableConsumer<ExecutorFactory>)
ExecutorFactory.Global::unsafeSet, classLoader)
.accept(factory);
+ IntSupplier intSupplier = () -> {
+ if (InterceptibleThread.isDeterministic())
+ throw failWithOOM();
+ return random.uniform(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ };
+
IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableBiConsumer<InterceptorOfGlobalMethods,
IntSupplier>) InterceptorOfGlobalMethods.Global::unsafeSet, classLoader)
- .accept(interceptorOfGlobalMethods, () -> {
- if (InterceptibleThread.isDeterministic())
- throw failWithOOM();
- return random.uniform(Integer.MIN_VALUE,
Integer.MAX_VALUE);
- });
+ .accept(interceptorOfGlobalMethods, intSupplier);
+
+
InterceptorOfGlobalMethods.Global.unsafeSet(interceptorOfGlobalMethods,
intSupplier);
Review Comment:
@belliottsmith I believe you said that this is not the correct behavior and
unsafe... This was needed to avoid ClassLoader issues, not sure if/how to fix
the test to avoid
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStores.java:
##########
@@ -15,28 +15,102 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.service.accord;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.local.CommandStores;
import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
import accord.local.ShardDistributor;
+import accord.primitives.Routables;
import accord.topology.Topology;
+import accord.utils.MapReduceConsume;
import accord.utils.RandomSource;
+import org.apache.cassandra.concurrent.ExecutionFailure;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
+import org.apache.cassandra.journal.AsyncWriteCallback;
public class AccordCommandStores extends CommandStores<AccordCommandStore>
{
- private long cacheSize;
+ private final AccordJournal journal;
+
AccordCommandStores(NodeTimeService time, Agent agent, DataStore store,
RandomSource random,
- ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory)
+ ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory, AccordJournal journal)
{
super(time, agent, store, random, shardDistributor,
progressLogFactory, AccordCommandStore::new);
+ this.journal = journal;
setCacheSize(maxCacheSize());
}
+ static Factory factory(AccordJournal journal)
+ {
+ return (time, agent, store, random, shardDistributor,
progressLogFactory) ->
+ new AccordCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, journal);
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ super.shutdown();
+ journal.shutdown();
+ //TODO shutdown isn't useful by itself, we need a way to "wait" as
well. Should be AutoCloseable or offer awaitTermination as well (think
Shutdownable interface)
+ }
+
+ @Override
+ protected <O> void mapReduceConsume(
+ PreLoadContext context,
+ Routables<?, ?> keys,
+ long minEpoch,
+ long maxEpoch,
+ MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
+ {
+ // append PreAccept, Accept, Commit, and Apply messages durably to
AccordJournal before processing
+ if (journal.mustMakeDurable(context))
+ mapReduceConsumeDurable(context, keys, minEpoch, maxEpoch,
mapReduceConsume);
+ else
+ super.mapReduceConsume(context, keys, minEpoch, maxEpoch,
mapReduceConsume);
+ }
+
+ private <O> void mapReduceConsumeDurable(
+ PreLoadContext context,
+ Routables<?, ?> keys,
+ long minEpoch,
+ long maxEpoch,
+ MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
+ {
+ journal.append(context, ImmediateExecutor.INSTANCE, new
AsyncWriteCallback()
+ {
+ @Override
+ public void run()
+ {
+ // TODO (performance, expected): do not retain references to
messages beyond a certain total
+ // cache threshold; in case of flush lagging behind, read
the messages from journal and
+ // deserialize instead before processing, to prevent
memory pressure buildup from messages
+ // pending flush to disk.
+ AccordCommandStores.super.mapReduceConsume(context, keys,
minEpoch, maxEpoch, mapReduceConsume);
+ }
+
+ @Override
+ public void onFailure(Throwable error)
+ {
+ // should we invoke Agent#onUncaughtException() instead?
+ ExecutionFailure.handle(error);
Review Comment:
`mapReduceConsume` has a error handler, would be good to notify
```
mapReduceConsume(null, error);
```
##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -124,8 +124,9 @@ public AsyncOperation(AccordCommandStore commandStore,
PreLoadContext preLoadCon
this.commandStore = commandStore;
this.preLoadContext = preLoadContext;
this.loader = createAsyncLoader(commandStore, preLoadContext);
- setLoggingIds();
Review Comment:
mind reverting this class? your logic is no longer here
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.simulator.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.Utils;
+import accord.api.Data;
+import accord.api.RoutingKey;
+import accord.api.Update;
+import accord.api.Write;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.messages.PreAccept;
+import accord.messages.TxnRequest;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.Files;
+import org.apache.cassandra.journal.AsyncWriteCallback;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.service.accord.AccordJournal;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+public class AccordJournalTest extends SimulationTestBase
+{
+ @Test
+ public void test() throws IOException
+ {
+ simulate(arr(() -> run()),
+ () -> check());
+ }
+
+ private static void run()
+ {
+ for (int i = 0; i < State.events; i++)
+ {
+ int finalI = i;
+ State.executor.execute(() -> State.append(finalI));
+ }
+
+ try
+ {
+ State.eventsDurable.await();
+ State.logger.info("All events are durable done!");
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ if (!State.exceptions.isEmpty())
+ {
+ AssertionError error = new AssertionError("Exceptions found during
test");
+ State.exceptions.forEach(error::addSuppressed);
+ throw error;
+ }
+
+ State.journal.shutdown();
+ State.logger.info("Run complete");
+ }
+
+ private static void check()
+ {
+ State.logger.info("Check starting");
+ State.journal.start(); // to avoid a while true deadlock
+ try
+ {
+ for (int i = 0; i < State.events; i++)
+ {
+ TxnRequest<?> event = State.journal.read(State.toTxnId(i),
AccordJournal.Type.PREACCEPT_REQ);
+ State.logger.info("Event {} -> {}", i, event);
+ if (event == null)
+ throw new AssertionError(String.format("Unable to read
event %d", i));
+ }
+ State.logger.info("Check complete");
+ }
+ finally
+ {
+ State.journal.shutdown();
+ }
+ }
+
+ @Isolated
+ public static class State
+ {
+ private static final Logger logger =
LoggerFactory.getLogger(State.class);
+ private static final String KEYSPACE = "test";
+
+ static
+ {
+ Files.newInMemoryFileSystem();
Review Comment:
In https://issues.apache.org/jira/browse/CASSANDRA-18485 I renamed this to
`newGlobalInMemoryFileSystem` to denote that this is impacting the global
system...
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStores.java:
##########
@@ -15,28 +15,102 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.service.accord;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.local.CommandStores;
import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
import accord.local.ShardDistributor;
+import accord.primitives.Routables;
import accord.topology.Topology;
+import accord.utils.MapReduceConsume;
import accord.utils.RandomSource;
+import org.apache.cassandra.concurrent.ExecutionFailure;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
+import org.apache.cassandra.journal.AsyncWriteCallback;
public class AccordCommandStores extends CommandStores<AccordCommandStore>
{
- private long cacheSize;
+ private final AccordJournal journal;
+
AccordCommandStores(NodeTimeService time, Agent agent, DataStore store,
RandomSource random,
- ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory)
+ ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory, AccordJournal journal)
{
super(time, agent, store, random, shardDistributor,
progressLogFactory, AccordCommandStore::new);
+ this.journal = journal;
setCacheSize(maxCacheSize());
}
+ static Factory factory(AccordJournal journal)
+ {
+ return (time, agent, store, random, shardDistributor,
progressLogFactory) ->
+ new AccordCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, journal);
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ super.shutdown();
+ journal.shutdown();
+ //TODO shutdown isn't useful by itself, we need a way to "wait" as
well. Should be AutoCloseable or offer awaitTermination as well (think
Shutdownable interface)
Review Comment:
would be good to have a similar interface to
`org.apache.cassandra.concurrent.Shutdownable` in Accord and get that into the
API, this has actually been an issue for jvm-dtest... when Caleb moved to
shared clusters we see it less as we shutdown once, but when we had tests spin
up and tear down clusters, this was a nightmare
Not required as part of this patch...
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.simulator.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.Utils;
+import accord.api.Data;
+import accord.api.RoutingKey;
+import accord.api.Update;
+import accord.api.Write;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.messages.PreAccept;
+import accord.messages.TxnRequest;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.Files;
+import org.apache.cassandra.journal.AsyncWriteCallback;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.service.accord.AccordJournal;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+public class AccordJournalTest extends SimulationTestBase
Review Comment:
This will be failing due to simulator not supporting mmap until
https://issues.apache.org/jira/browse/CASSANDRA-18485 is in, @belliottsmith any
issues with that? A work around we could do is to make this feature
"pluggable" so we can no-op via config... and have simulator disable until
https://issues.apache.org/jira/browse/CASSANDRA-18485 is ready (won't help this
test, but would help the accord simulator test)
I assume once https://issues.apache.org/jira/browse/CASSANDRA-18485 goes in
on trunk we can rebase shortly after, so the duration of the breakage shouldn't
be "long"?
##########
test/unit/org/apache/cassandra/io/util/Files.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.file.FileSystem;
+
+import com.google.common.jimfs.Jimfs;
+
+public class Files
+{
+ public static FileSystem newInMemoryFileSystem()
Review Comment:
renamed to `newGlobalInMemoryFileSystem` in
https://issues.apache.org/jira/browse/CASSANDRA-18485, if we rename here the
rebase should avoid the test failing to setup the global fs (this method
exists, but doesn't update `File`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]