dcapwell commented on code in PR #3408:
URL: https://github.com/apache/cassandra/pull/3408#discussion_r1670810624
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
Review Comment:
dead code?
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
+
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void run()
+ public static void check()
Review Comment:
having the check be the test and not the validation is much cleaner, good
idea
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
Review Comment:
dead code?
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
Review Comment:
Logger please
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
+
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void run()
+ public static void check()
{
- for (int i = 0; i < State.events; i++)
+ State.journal.start();
+ try
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ int finalI = i;
+ State.executor.submit(() -> State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1), null));
+ }
+
+ State.latch.await();
+
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println("Reading " + State.journal.readFirst("test"
+ i));
+ Assert.assertEquals(State.journal.readFirst("test" + i),
"test" + i);
+ }
+ }
+ catch (Throwable e)
{
- int finalI = i;
- State.executor.execute(() -> State.append(finalI));
+ e.printStackTrace();
}
+ finally
+ {
+ State.journal.shutdown();
+ }
+ }
- try
+ public static class TestCallbacks implements AsyncCallbacks<String, String>
+ {
+
+ @Override
+ public void onWrite(long segment, int position, int size, String key,
String value, Object writeContext)
{
- State.eventsDurable.await();
- State.logger.info("All events are durable done!");
+ State.latch.decrement();
}
- catch (InterruptedException e)
+
+ @Override
+ public void onWriteFailed(String key, String value, Object
writeContext, Throwable cause)
{
- throw new AssertionError(e);
+ State.latch.decrement();
Review Comment:
this is a big issue as the test could pass and this will be ignored. You
should save this to `State` and assert this in the test. Any failures is
unexpected and should lead to a test fault
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
+
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void run()
+ public static void check()
{
- for (int i = 0; i < State.events; i++)
+ State.journal.start();
+ try
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ int finalI = i;
+ State.executor.submit(() -> State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1), null));
+ }
+
+ State.latch.await();
+
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println("Reading " + State.journal.readFirst("test"
+ i));
+ Assert.assertEquals(State.journal.readFirst("test" + i),
"test" + i);
+ }
+ }
+ catch (Throwable e)
{
- int finalI = i;
- State.executor.execute(() -> State.append(finalI));
+ e.printStackTrace();
}
+ finally
+ {
+ State.journal.shutdown();
+ }
+ }
- try
+ public static class TestCallbacks implements AsyncCallbacks<String, String>
+ {
+
+ @Override
+ public void onWrite(long segment, int position, int size, String key,
String value, Object writeContext)
{
- State.eventsDurable.await();
- State.logger.info("All events are durable done!");
+ State.latch.decrement();
}
- catch (InterruptedException e)
+
+ @Override
+ public void onWriteFailed(String key, String value, Object
writeContext, Throwable cause)
{
- throw new AssertionError(e);
+ State.latch.decrement();
}
- if (!State.exceptions.isEmpty())
+ @Override
+ public void onFlush(long segment, int position)
{
- AssertionError error = new AssertionError("Exceptions found during
test");
- State.exceptions.forEach(error::addSuppressed);
- throw error;
}
- State.journal.shutdown();
- State.logger.info("Run complete");
+ @Override
+ public void onFlushFailed(Throwable cause)
+ {
+ new RuntimeException("Could not flush", cause).printStackTrace();
+ }
}
- private static void check()
+ @Isolated
+ public static class IdentityValueSerializer implements
ValueSerializer<String, String>
{
- State.logger.info("Check starting");
- State.journal.start(null); // to avoid a while true deadlock
- try
+
+ @Override
+ public int serializedSize(String key, String value, int userVersion)
{
- for (int i = 0; i < State.events; i++)
- {
- TxnRequest<?> event =
State.journal.readMessage(State.toTxnId(i), MessageType.PRE_ACCEPT_REQ,
PreAccept.class);
- State.logger.info("Event {} -> {}", i, event);
- if (event == null)
- throw new AssertionError(String.format("Unable to read
event %d", i));
- }
- State.logger.info("Check complete");
+ return TypeSizes.INT_SIZE + key.length();
}
- finally
+
+ @Override
+ public void serialize(String key, String value, DataOutputPlus out,
int userVersion) throws IOException
{
- State.journal.shutdown();
+ out.writeInt(key.length());
+ out.writeBytes(key);
+ }
+
+ @Override
+ public String deserialize(String key, DataInputPlus in, int
userVersion) throws IOException
+ {
+ int size = in.readInt();
+ byte[] value = new byte[size];
+ for (int i = 0; i < size; i++)
+ value[i] = in.readByte();
+
+ return new String(value);
+ }
+ }
+
+ @Isolated
+ public static class IdentityKeySerializer implements KeySupport<String>
+ {
+
+ @Override
+ public int serializedSize(int userVersion)
+ {
+ return 16;
+ }
+
+ @Override
+ public void serialize(String key, DataOutputPlus out, int userVersion)
throws IOException
+ {
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ if (key.length() > maxSize)
+ throw new IllegalStateException();
+
+ out.writeInt(key.length());
+ out.writeBytes(key);
+ int remaining = maxSize - key.length();
+ for (int i = 0; i < remaining; i++)
+ out.writeByte(0);
+ }
+
+ @Override
+ public String deserialize(DataInputPlus in, int userVersion) throws
IOException
+ {
+ int size = in.readInt();
+ byte[] key = new byte[size];
+ for (int i = 0; i < size; i++)
+ key[i] = in.readByte();
+
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ int remaining = maxSize - size;
+ for (int i = 0; i < remaining; i++)
+ in.readByte(); // todo assert 0
Review Comment:
want to fix the todo? Also `0` is common so could be good to use another
value like `4` (randomly selected from XKCD)
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
+
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void run()
+ public static void check()
{
- for (int i = 0; i < State.events; i++)
+ State.journal.start();
+ try
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ int finalI = i;
+ State.executor.submit(() -> State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1), null));
+ }
+
+ State.latch.await();
+
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println("Reading " + State.journal.readFirst("test"
+ i));
+ Assert.assertEquals(State.journal.readFirst("test" + i),
"test" + i);
+ }
+ }
+ catch (Throwable e)
{
- int finalI = i;
- State.executor.execute(() -> State.append(finalI));
+ e.printStackTrace();
}
+ finally
+ {
+ State.journal.shutdown();
+ }
+ }
- try
+ public static class TestCallbacks implements AsyncCallbacks<String, String>
+ {
+
+ @Override
+ public void onWrite(long segment, int position, int size, String key,
String value, Object writeContext)
{
- State.eventsDurable.await();
- State.logger.info("All events are durable done!");
+ State.latch.decrement();
}
- catch (InterruptedException e)
+
+ @Override
+ public void onWriteFailed(String key, String value, Object
writeContext, Throwable cause)
{
- throw new AssertionError(e);
+ State.latch.decrement();
}
- if (!State.exceptions.isEmpty())
+ @Override
+ public void onFlush(long segment, int position)
{
- AssertionError error = new AssertionError("Exceptions found during
test");
- State.exceptions.forEach(error::addSuppressed);
- throw error;
}
- State.journal.shutdown();
- State.logger.info("Run complete");
+ @Override
+ public void onFlushFailed(Throwable cause)
+ {
+ new RuntimeException("Could not flush", cause).printStackTrace();
Review Comment:
this is a big issue as the test could pass and this *will* be ignored. You
should save this to `State` and assert this in the test. Any failures is
unexpected and should lead to a test fault
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
+
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void run()
+ public static void check()
{
- for (int i = 0; i < State.events; i++)
+ State.journal.start();
+ try
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ int finalI = i;
+ State.executor.submit(() -> State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1), null));
+ }
+
+ State.latch.await();
+
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println("Reading " + State.journal.readFirst("test"
+ i));
Review Comment:
Logger please
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
+
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void run()
+ public static void check()
{
- for (int i = 0; i < State.events; i++)
+ State.journal.start();
+ try
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ int finalI = i;
+ State.executor.submit(() -> State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1), null));
+ }
+
+ State.latch.await();
+
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println("Reading " + State.journal.readFirst("test"
+ i));
+ Assert.assertEquals(State.journal.readFirst("test" + i),
"test" + i);
+ }
+ }
+ catch (Throwable e)
{
- int finalI = i;
- State.executor.execute(() -> State.append(finalI));
+ e.printStackTrace();
}
+ finally
+ {
+ State.journal.shutdown();
+ }
+ }
- try
+ public static class TestCallbacks implements AsyncCallbacks<String, String>
+ {
+
+ @Override
+ public void onWrite(long segment, int position, int size, String key,
String value, Object writeContext)
{
- State.eventsDurable.await();
- State.logger.info("All events are durable done!");
+ State.latch.decrement();
}
- catch (InterruptedException e)
+
+ @Override
+ public void onWriteFailed(String key, String value, Object
writeContext, Throwable cause)
{
- throw new AssertionError(e);
+ State.latch.decrement();
}
- if (!State.exceptions.isEmpty())
+ @Override
+ public void onFlush(long segment, int position)
{
- AssertionError error = new AssertionError("Exceptions found during
test");
- State.exceptions.forEach(error::addSuppressed);
- throw error;
}
- State.journal.shutdown();
- State.logger.info("Run complete");
+ @Override
+ public void onFlushFailed(Throwable cause)
+ {
+ new RuntimeException("Could not flush", cause).printStackTrace();
+ }
}
- private static void check()
+ @Isolated
+ public static class IdentityValueSerializer implements
ValueSerializer<String, String>
{
- State.logger.info("Check starting");
- State.journal.start(null); // to avoid a while true deadlock
- try
+
+ @Override
+ public int serializedSize(String key, String value, int userVersion)
{
- for (int i = 0; i < State.events; i++)
- {
- TxnRequest<?> event =
State.journal.readMessage(State.toTxnId(i), MessageType.PRE_ACCEPT_REQ,
PreAccept.class);
- State.logger.info("Event {} -> {}", i, event);
- if (event == null)
- throw new AssertionError(String.format("Unable to read
event %d", i));
- }
- State.logger.info("Check complete");
+ return TypeSizes.INT_SIZE + key.length();
}
- finally
+
+ @Override
+ public void serialize(String key, String value, DataOutputPlus out,
int userVersion) throws IOException
{
- State.journal.shutdown();
+ out.writeInt(key.length());
+ out.writeBytes(key);
+ }
+
+ @Override
+ public String deserialize(String key, DataInputPlus in, int
userVersion) throws IOException
+ {
+ int size = in.readInt();
+ byte[] value = new byte[size];
+ for (int i = 0; i < size; i++)
+ value[i] = in.readByte();
+
+ return new String(value);
+ }
+ }
+
+ @Isolated
+ public static class IdentityKeySerializer implements KeySupport<String>
+ {
+
+ @Override
+ public int serializedSize(int userVersion)
+ {
+ return 16;
+ }
+
+ @Override
+ public void serialize(String key, DataOutputPlus out, int userVersion)
throws IOException
+ {
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ if (key.length() > maxSize)
+ throw new IllegalStateException();
+
+ out.writeInt(key.length());
+ out.writeBytes(key);
+ int remaining = maxSize - key.length();
+ for (int i = 0; i < remaining; i++)
+ out.writeByte(0);
+ }
+
+ @Override
+ public String deserialize(DataInputPlus in, int userVersion) throws
IOException
+ {
+ int size = in.readInt();
+ byte[] key = new byte[size];
+ for (int i = 0; i < size; i++)
+ key[i] = in.readByte();
+
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ int remaining = maxSize - size;
+ for (int i = 0; i < remaining; i++)
+ in.readByte(); // todo assert 0
+
+ return new String(key);
+ }
+
+ @Override
+ public String deserialize(ByteBuffer buffer, int position, int
userVersion)
+ {
+ int size = buffer.getInt();
+ byte[] key = new byte[size];
+ for (int i = 0; i < size; i++)
+ key[i] = buffer.get();
+
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ int remaining = maxSize - size;
+ for (int i = 0; i < remaining; i++)
+ buffer.get(); // todo assert 0
Review Comment:
want to fix the todo? Also `0` is common so could be good to use another
value like `4` (randomly selected from XKCD)
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -18,250 +18,359 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.zip.Checksum;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
+ public void simpleRWTest() throws IOException
{
- simulate(arr(() -> run()),
- () -> check());
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
System.out.println("DatabaseDescriptor.getCommitLogWriteDiskAccessMode() = " +
DatabaseDescriptor.getCommitLogWriteDiskAccessMode());
+
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ long seed = 1L;
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
+
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
+
+ Keyspace.setInitialized();
+
+ CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new,
+ false,
+ new FakeSchema(),
+ new
TokenPlacementModel.SimpleReplicationFactor(1));
+
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void run()
+ public static void check()
{
- for (int i = 0; i < State.events; i++)
+ State.journal.start();
+ try
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ int finalI = i;
+ State.executor.submit(() -> State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1), null));
+ }
+
+ State.latch.await();
+
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.println("Reading " + State.journal.readFirst("test"
+ i));
+ Assert.assertEquals(State.journal.readFirst("test" + i),
"test" + i);
+ }
+ }
+ catch (Throwable e)
{
- int finalI = i;
- State.executor.execute(() -> State.append(finalI));
+ e.printStackTrace();
Review Comment:
write to logger please
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]