dcapwell commented on code in PR #1962:
URL: https://github.com/apache/cassandra/pull/1962#discussion_r1038360502
##########
src/antlr/Cql.g:
##########
@@ -43,6 +43,7 @@ import Parser,Lexer;
import org.apache.cassandra.cql3.selection.*;
import org.apache.cassandra.cql3.statements.*;
import org.apache.cassandra.cql3.statements.schema.*;
+ import org.apache.cassandra.cql3.transactions.*;
Review Comment:
unused import?
##########
src/java/org/apache/cassandra/cql3/Constants.java:
##########
@@ -518,19 +517,39 @@ public Substracter(ColumnMetadata column, Term t)
super(column, t);
}
+ public boolean requiresRead()
Review Comment:
why does this say you changed this?
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Throwables;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import accord.coordinate.Preempted;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.TransactionStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.utils.FailingConsumer;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertArrayEquals;
+
+public abstract class AccordTestBase extends TestBaseImpl
+{
+ protected static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+ protected static Cluster sharedCluster;
+
+ protected String currentTable;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ sharedCluster = createCluster();
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (sharedCluster != null)
+ sharedCluster.close();
+ }
+
+ @Before
+ public void setup()
+ {
+ currentTable = KEYSPACE + ".tbl" + COUNTER.getAndIncrement();
+ }
+
+ protected static void assertRow(Cluster cluster, String query, int k, int
c, int v)
+ {
+ Object[][] result = cluster.coordinator(1).execute(query,
ConsistencyLevel.QUORUM);
+ assertArrayEquals(new Object[]{new Object[] {k, c, v}}, result);
+ }
+
+ protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws
Exception
+ {
+ sharedCluster.schemaChange(tableDDL);
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().createEpochFromConfigUnsafe()));
+
+ // Evict commands from the cache immediately to expose problems
loading from disk.
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().setCacheSize(0)));
+
+ fn.accept(sharedCluster);
+
+ // Reset any messaging filters.
+ sharedCluster.filters().reset();
Review Comment:
should this by try/finally?
##########
test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java:
##########
@@ -154,20 +154,34 @@ public static void
processCommandResult(AccordCommandStore commandStore, Command
throw new RuntimeException(e);
}
})
- .reduce(null, AccordData::merge);
+ .reduce(null, TxnData::merge);
Write write = txn.update().apply(readData);
((AccordCommand)command).setWrites(new Writes(command.executeAt(),
txn.keys(), write));
((AccordCommand)command).setResult(txn.query().compute(command.txnId(),
readData, txn.read(), txn.update()));
}).get();
}
+ public static Txn createTxn(String query)
Review Comment:
this is very useful for cases like Simulator which benefits from bypassing
TransactionStatement to directly access `TxnData` and all "hidden" state
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Throwables;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import accord.coordinate.Preempted;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.TransactionStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.utils.FailingConsumer;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertArrayEquals;
+
+public abstract class AccordTestBase extends TestBaseImpl
+{
+ protected static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+ protected static Cluster sharedCluster;
+
+ protected String currentTable;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ sharedCluster = createCluster();
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (sharedCluster != null)
+ sharedCluster.close();
+ }
+
+ @Before
+ public void setup()
+ {
+ currentTable = KEYSPACE + ".tbl" + COUNTER.getAndIncrement();
+ }
+
+ protected static void assertRow(Cluster cluster, String query, int k, int
c, int v)
+ {
+ Object[][] result = cluster.coordinator(1).execute(query,
ConsistencyLevel.QUORUM);
+ assertArrayEquals(new Object[]{new Object[] {k, c, v}}, result);
+ }
+
+ protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws
Exception
+ {
+ sharedCluster.schemaChange(tableDDL);
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().createEpochFromConfigUnsafe()));
+
+ // Evict commands from the cache immediately to expose problems
loading from disk.
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().setCacheSize(0)));
+
+ fn.accept(sharedCluster);
+
+ // Reset any messaging filters.
+ sharedCluster.filters().reset();
+ }
+
+ protected void test(FailingConsumer<Cluster> fn) throws Exception
+ {
+ test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary
key (k, c))", fn);
+ }
+
+ private static Cluster createCluster() throws IOException
+ {
+ // need to up the timeout else tests get flaky
+ // disable vnode for now, but should enable before trunk
+ return init(Cluster.build(2)
+ .withoutVNodes()
+ .withConfig(c ->
c.with(Feature.NETWORK).set("write_request_timeout",
"10s").set("transaction_timeout", "15s"))
+ .withInstanceInitializer(ByteBuddyHelper::install)
+ .start());
+ }
+
+ // TODO: Retry on preemption may become unnecessary after the Unified Log
is integrated.
+ protected static SimpleQueryResult
assertRowEqualsWithPreemptedRetry(Cluster cluster, Object[] row, String check,
Object... boundValues)
+ {
+ SimpleQueryResult result = executeWithRetry(cluster, check,
boundValues);
+ Assertions.assertThat(result.toObjectArrays()).isEqualTo(row == null ?
new Object[0] : new Object[] { row });
+ return result;
+ }
+
+ protected static SimpleQueryResult executeWithRetry(Cluster cluster,
String check, Object... boundValues)
+ {
+ try
+ {
+ return cluster.coordinator(1).executeWithResult(check,
ConsistencyLevel.ANY, boundValues);
+ }
+ catch (Throwable t)
+ {
+ if
(Throwables.getRootCause(t).toString().contains(Preempted.class.getName()))
+ return executeWithRetry(cluster, check, boundValues);
Review Comment:
should we add delays? If so can use
`org.apache.cassandra.utils.Retry#retryWithBackoffBlocking(int,
java.util.function.Supplier<A>)`
##########
src/java/org/apache/cassandra/utils/ByteBufferUtil.java:
##########
@@ -533,6 +551,35 @@ else if (obj instanceof byte[])
return ByteBuffer.wrap((byte[]) obj);
else if (obj instanceof ByteBuffer)
return (ByteBuffer) obj;
+ else if (obj instanceof List)
+ {
+ List<?> list = (List<?>) obj;
+ // convert subtypes to BB
+ List<ByteBuffer> bbs =
list.stream().map(ByteBufferUtil::objectToBytes).collect(Collectors.toList());
+ // decompose/serializer doesn't use the isMultiCell, so safe to do
this
+ return ListType.getInstance(BytesType.instance,
false).decompose(bbs);
+ }
+ else if (obj instanceof Map)
+ {
+ Map<?, ?> map = (Map<?, ?>) obj;
+ // convert subtypes to BB
+ Map<ByteBuffer, ByteBuffer> bbs = new LinkedHashMap<>();
+ for (Map.Entry<?, ?> e : map.entrySet())
+ bbs.put(objectToBytes(e.getKey()),
objectToBytes(e.getValue()));
Review Comment:
for safety should we validate `put` returns `null`?
##########
src/java/org/apache/cassandra/utils/ArraySerializers.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.util.function.IntFunction;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
+public class ArraySerializers
+{
+ public static <T> void serializeArray(T[] items, DataOutputPlus out, int
version, IVersionedSerializer<T> serializer) throws IOException
+ {
+ out.writeUnsignedVInt(items.length);
+ for (T item : items)
+ serializer.serialize(item, out, version);
+ }
+
+ public static <T> T[] deserializeArray(DataInputPlus in, int version,
IVersionedSerializer<T> serializer, IntFunction<T[]> arrayFactory) throws
IOException
+ {
+ int size = checkedCast(in.readUnsignedVInt());
Review Comment:
since `checkedCast(in.readUnsignedVInt())` is the common case, should we add
this to `DataInputPlus`? `readCheckedUnsignedVInt`?
##########
src/java/org/apache/cassandra/utils/ByteBufferUtil.java:
##########
@@ -533,6 +551,35 @@ else if (obj instanceof byte[])
return ByteBuffer.wrap((byte[]) obj);
else if (obj instanceof ByteBuffer)
return (ByteBuffer) obj;
+ else if (obj instanceof List)
+ {
+ List<?> list = (List<?>) obj;
+ // convert subtypes to BB
+ List<ByteBuffer> bbs =
list.stream().map(ByteBufferUtil::objectToBytes).collect(Collectors.toList());
+ // decompose/serializer doesn't use the isMultiCell, so safe to do
this
+ return ListType.getInstance(BytesType.instance,
false).decompose(bbs);
+ }
+ else if (obj instanceof Map)
+ {
+ Map<?, ?> map = (Map<?, ?>) obj;
+ // convert subtypes to BB
+ Map<ByteBuffer, ByteBuffer> bbs = new LinkedHashMap<>();
+ for (Map.Entry<?, ?> e : map.entrySet())
+ bbs.put(objectToBytes(e.getKey()),
objectToBytes(e.getValue()));
+ // decompose/serializer doesn't use the isMultiCell, so safe to do
this
+ return MapType.getInstance(BytesType.instance, BytesType.instance,
false).decompose(bbs);
+ }
+ else if (obj instanceof Set)
+ {
+ Set<?> set = (Set<?>) obj;
+ // convert subtypes to BB
+ Set<ByteBuffer> bbs = new LinkedHashSet<>();
+ set.forEach(o -> bbs.add(objectToBytes(o)));
Review Comment:
for safety should we validate `add` returns `true`?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -99,6 +110,39 @@ public static long nowInMicros()
return
TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
}
+ public TxnData coordinate(Txn txn)
+ {
+ try
+ {
+ Future<Result> future = node.coordinate(txn);
+ Result result =
future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.SECONDS),
TimeUnit.SECONDS);
+ return (TxnData) result;
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if (cause instanceof Timeout)
+ throw throwTimeout(txn);
+ throw new RuntimeException(e);
Review Comment:
this should throw `cause`?
##########
src/java/org/apache/cassandra/config/Config.java:
##########
@@ -1171,8 +1173,9 @@ public static void log(Config config)
String value;
try
{
- // Field.get() can throw NPE if the value of the field is null
- value = field.get(config).toString();
+ // don't use exceptions for normal control flow!
+ Object obj = field.get(config);
+ value = obj != null ? obj.toString() : "null";
Review Comment:
comment said that `field.get` throws NPE; is that true? it should return
null... should we also remove the `NPE` catch?
##########
src/java/org/apache/cassandra/audit/AuditLogEntryType.java:
##########
@@ -60,6 +60,9 @@
CREATE_ROLE(AuditLogEntryCategory.DCL),
USE_KEYSPACE(AuditLogEntryCategory.OTHER),
DESCRIBE(AuditLogEntryCategory.OTHER),
+
+ // TODO: Is DML the most appropriate classification, given a transaction
can read, write, or both?
+ TRANSACTION(AuditLogEntryCategory.DML),
Review Comment:
Maybe best to add a new one? Maybe `TRANSACTION` like we did for
`WriteType`?
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1855,6 +1855,11 @@ public static long getCasContentionTimeout(TimeUnit unit)
return conf.cas_contention_timeout.to(unit);
}
+ public static long getTransactionTimeout(TimeUnit unit)
Review Comment:
no setter?
##########
src/antlr/Parser.g:
##########
@@ -73,6 +80,19 @@ options {
return marker;
}
+ public RowDataReference.Raw newRowDataReference(Selectable.RawIdentifier
tuple, Selectable.Raw selectable)
+ {
+ if (!isParsingTxn)
+ throw new IllegalStateException();
Review Comment:
can we get a useful error msg?
##########
src/java/org/apache/cassandra/utils/CollectionSerializers.java:
##########
@@ -19,41 +19,43 @@
package org.apache.cassandra.utils;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.RandomAccess;
import java.util.Set;
import java.util.function.IntFunction;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-public class CollectionSerializer
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
+public class CollectionSerializers
{
- public static <V> void serializeCollection(IVersionedSerializer<V>
valueSerializer, Collection<V> values, DataOutputPlus out, int version) throws
IOException
+ public static <V> void serializeCollection(Collection<V> values,
DataOutputPlus out, int version, IVersionedSerializer<V> valueSerializer)
throws IOException
{
out.writeUnsignedVInt(values.size());
for (V value : values)
valueSerializer.serialize(value, out, version);
}
- public static <V, L extends List<V> & RandomAccess> void
serializeList(IVersionedSerializer<V> valueSerializer, L values, DataOutputPlus
out, int version) throws IOException
Review Comment:
if you remove `& RandomAccess` shouldn't you just call `serializeCollection`
then to avoid copy/paste? `& RandomAccess` looks to be there to know its safe
to do `.get` to avoid the memory overhead of iterators
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Throwables;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import accord.coordinate.Preempted;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.TransactionStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.utils.FailingConsumer;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertArrayEquals;
+
+public abstract class AccordTestBase extends TestBaseImpl
+{
+ protected static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+ protected static Cluster sharedCluster;
Review Comment:
nit: all caps
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Throwables;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import accord.coordinate.Preempted;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.TransactionStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.utils.FailingConsumer;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertArrayEquals;
+
+public abstract class AccordTestBase extends TestBaseImpl
+{
+ protected static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+ protected static Cluster sharedCluster;
+
+ protected String currentTable;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ sharedCluster = createCluster();
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (sharedCluster != null)
+ sharedCluster.close();
+ }
+
+ @Before
+ public void setup()
+ {
+ currentTable = KEYSPACE + ".tbl" + COUNTER.getAndIncrement();
+ }
+
+ protected static void assertRow(Cluster cluster, String query, int k, int
c, int v)
+ {
+ Object[][] result = cluster.coordinator(1).execute(query,
ConsistencyLevel.QUORUM);
+ assertArrayEquals(new Object[]{new Object[] {k, c, v}}, result);
+ }
+
+ protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws
Exception
+ {
+ sharedCluster.schemaChange(tableDDL);
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().createEpochFromConfigUnsafe()));
+
+ // Evict commands from the cache immediately to expose problems
loading from disk.
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().setCacheSize(0)));
+
+ fn.accept(sharedCluster);
+
+ // Reset any messaging filters.
+ sharedCluster.filters().reset();
+ }
+
+ protected void test(FailingConsumer<Cluster> fn) throws Exception
+ {
+ test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary
key (k, c))", fn);
+ }
+
+ private static Cluster createCluster() throws IOException
+ {
+ // need to up the timeout else tests get flaky
+ // disable vnode for now, but should enable before trunk
+ return init(Cluster.build(2)
+ .withoutVNodes()
+ .withConfig(c ->
c.with(Feature.NETWORK).set("write_request_timeout",
"10s").set("transaction_timeout", "15s"))
+ .withInstanceInitializer(ByteBuddyHelper::install)
+ .start());
+ }
+
+ // TODO: Retry on preemption may become unnecessary after the Unified Log
is integrated.
+ protected static SimpleQueryResult
assertRowEqualsWithPreemptedRetry(Cluster cluster, Object[] row, String check,
Object... boundValues)
+ {
+ SimpleQueryResult result = executeWithRetry(cluster, check,
boundValues);
+ Assertions.assertThat(result.toObjectArrays()).isEqualTo(row == null ?
new Object[0] : new Object[] { row });
+ return result;
+ }
+
+ protected static SimpleQueryResult executeWithRetry(Cluster cluster,
String check, Object... boundValues)
+ {
+ try
+ {
+ return cluster.coordinator(1).executeWithResult(check,
ConsistencyLevel.ANY, boundValues);
+ }
+ catch (Throwable t)
+ {
+ if
(Throwables.getRootCause(t).toString().contains(Preempted.class.getName()))
+ return executeWithRetry(cluster, check, boundValues);
+
+ throw t;
+ }
+ }
+
+ public static class ByteBuddyHelper
Review Comment:
should this be renamed to be more explicit? Maybe something like
`EnforceUpdateDoesNotPerformRead`?
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Throwables;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import accord.coordinate.Preempted;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.TransactionStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.utils.FailingConsumer;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertArrayEquals;
+
+public abstract class AccordTestBase extends TestBaseImpl
+{
+ protected static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+ protected static Cluster sharedCluster;
+
+ protected String currentTable;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ sharedCluster = createCluster();
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (sharedCluster != null)
+ sharedCluster.close();
+ }
+
+ @Before
+ public void setup()
+ {
+ currentTable = KEYSPACE + ".tbl" + COUNTER.getAndIncrement();
+ }
+
+ protected static void assertRow(Cluster cluster, String query, int k, int
c, int v)
+ {
+ Object[][] result = cluster.coordinator(1).execute(query,
ConsistencyLevel.QUORUM);
+ assertArrayEquals(new Object[]{new Object[] {k, c, v}}, result);
+ }
+
+ protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws
Exception
+ {
+ sharedCluster.schemaChange(tableDDL);
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().createEpochFromConfigUnsafe()));
+
+ // Evict commands from the cache immediately to expose problems
loading from disk.
+ sharedCluster.forEach(node -> node.runOnInstance(() ->
AccordService.instance().setCacheSize(0)));
+
+ fn.accept(sharedCluster);
+
+ // Reset any messaging filters.
+ sharedCluster.filters().reset();
+ }
+
+ protected void test(FailingConsumer<Cluster> fn) throws Exception
+ {
+ test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary
key (k, c))", fn);
+ }
+
+ private static Cluster createCluster() throws IOException
+ {
+ // need to up the timeout else tests get flaky
+ // disable vnode for now, but should enable before trunk
+ return init(Cluster.build(2)
+ .withoutVNodes()
+ .withConfig(c ->
c.with(Feature.NETWORK).set("write_request_timeout",
"10s").set("transaction_timeout", "15s"))
+ .withInstanceInitializer(ByteBuddyHelper::install)
+ .start());
+ }
+
+ // TODO: Retry on preemption may become unnecessary after the Unified Log
is integrated.
+ protected static SimpleQueryResult
assertRowEqualsWithPreemptedRetry(Cluster cluster, Object[] row, String check,
Object... boundValues)
+ {
+ SimpleQueryResult result = executeWithRetry(cluster, check,
boundValues);
+ Assertions.assertThat(result.toObjectArrays()).isEqualTo(row == null ?
new Object[0] : new Object[] { row });
+ return result;
+ }
+
+ protected static SimpleQueryResult executeWithRetry(Cluster cluster,
String check, Object... boundValues)
+ {
+ try
+ {
+ return cluster.coordinator(1).executeWithResult(check,
ConsistencyLevel.ANY, boundValues);
+ }
+ catch (Throwable t)
+ {
+ if
(Throwables.getRootCause(t).toString().contains(Preempted.class.getName()))
Review Comment:
FYI there is a `AssertionUtils` class to make working with this easier...
```
if (AssertionUtils.rootCauseIs(Preempted.class).matches(t))
```
This was written to work with Assert4j but can directly call `matches`
The main reason to prefer these methods is that they try to do the type
checks and not just the name check, so `class FluffyKitten extends Preempted`
will be detected as well.
##########
test/unit/org/apache/cassandra/utils/Generators.java:
##########
@@ -60,6 +60,23 @@
private static final Constraint DNS_DOMAIN_PART_CONSTRAINT =
Constraint.between(0, DNS_DOMAIN_PART_DOMAIN.length - 1).withNoShrinkPoint();
public static final Gen<String> IDENTIFIER_GEN =
Generators.regexWord(SourceDSL.integers().between(1, 50));
+ public static final Gen<String> SYMBOL_GEN =
filter(symbolGen(SourceDSL.integers().between(1, 48)),
Generators::thisBugIsBroughtToYouByTheLetterP);
+ private static boolean thisBugIsBroughtToYouByTheLetterP(String value)
+ {
+ // In Lexer.g DURATION is before IDENT and Duration allows the
following to parsse: P, and PT
+ // This causes an issue for cases that use IDENT as P and PT will not
match as they matched DURATION already
+ // to avoid these cases, this function will be used to filter them out
so only "valid" symbols are returned
+ // see CASSANDRA-17919
+ return !("P".equals(value) || "PT".equals(value));
+ }
+ private static final char CHAR_UNDERSCORE = 95;
+ public static Gen<String> symbolGen(Gen<Integer> size)
+ {
+ char[] domain = new char[LETTER_OR_DIGIT_DOMAIN.length + 1];
+ System.arraycopy(LETTER_OR_DIGIT_DOMAIN, 0, domain, 0,
LETTER_OR_DIGIT_DOMAIN.length);
+ domain[domain.length - 1] = CHAR_UNDERSCORE;
Review Comment:
nit: it would be good to save this outside this method as this is unchanged
and doesn't require reallocating for each call
##########
src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TxnDataName implements Comparable<TxnDataName>
+{
+ private static final TxnDataName RETURNING = new
TxnDataName(Kind.RETURNING);
+
+ public enum Kind
+ {
+ USER((byte) 1),
+ RETURNING((byte) 2),
+ AUTO_READ((byte) 3);
+
+ private final byte value;
+
+ Kind(byte value)
+ {
+ this.value = value;
+ }
+
+ public static Kind from(byte b)
+ {
+ switch (b)
+ {
+ case 1:
+ return USER;
+ case 2:
+ return RETURNING;
+ case 3:
+ return AUTO_READ;
+ default:
+ throw new IllegalArgumentException("Unknown kind: " + b);
+ }
+ }
+ }
+
+ private final Kind kind;
+ private final String[] parts;
+
+ public TxnDataName(Kind kind, String... parts)
+ {
+ this.kind = kind;
+ this.parts = parts;
+ }
+
+ public static TxnDataName user(String name)
+ {
+ return new TxnDataName(Kind.USER, name);
+ }
+
+ public static TxnDataName returning()
+ {
+ return RETURNING;
+ }
+
+ public static TxnDataName partitionRead(TableMetadata metadata,
DecoratedKey key, int index)
+ {
+ return new TxnDataName(Kind.AUTO_READ, metadata.keyspace,
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
+ }
+
+ private static String bytesToString(ByteBuffer bytes)
+ {
+ return ByteBufferUtil.bytesToHex(bytes);
+ }
+
+ private static ByteBuffer stringToBytes(String string)
+ {
+ return ByteBufferUtil.hexToBytes(string);
+ }
+
+ public Kind getKind()
+ {
+ return kind;
+ }
+
+ public List<String> getParts()
+ {
+ return Collections.unmodifiableList(Arrays.asList(parts));
+ }
+
+ public boolean isFor(TableMetadata metadata)
+ {
+ if (kind != Kind.AUTO_READ)
+ return false;
+ return metadata.keyspace.equals(parts[0])
+ && metadata.name.equals(parts[1]);
+ }
+
+ public DecoratedKey getDecoratedKey(TableMetadata metadata)
+ {
+ checkKind(Kind.AUTO_READ);
Review Comment:
should we also check `isFor`?
##########
src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TxnDataName implements Comparable<TxnDataName>
+{
+ private static final TxnDataName RETURNING = new
TxnDataName(Kind.RETURNING);
+
+ public enum Kind
+ {
+ USER((byte) 1),
+ RETURNING((byte) 2),
+ AUTO_READ((byte) 3);
+
+ private final byte value;
+
+ Kind(byte value)
+ {
+ this.value = value;
+ }
+
+ public static Kind from(byte b)
+ {
+ switch (b)
+ {
+ case 1:
+ return USER;
+ case 2:
+ return RETURNING;
+ case 3:
+ return AUTO_READ;
+ default:
+ throw new IllegalArgumentException("Unknown kind: " + b);
+ }
+ }
+ }
+
+ private final Kind kind;
+ private final String[] parts;
+
+ public TxnDataName(Kind kind, String... parts)
+ {
+ this.kind = kind;
+ this.parts = parts;
+ }
+
+ public static TxnDataName user(String name)
+ {
+ return new TxnDataName(Kind.USER, name);
+ }
+
+ public static TxnDataName returning()
+ {
+ return RETURNING;
+ }
+
+ public static TxnDataName partitionRead(TableMetadata metadata,
DecoratedKey key, int index)
+ {
+ return new TxnDataName(Kind.AUTO_READ, metadata.keyspace,
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
+ }
+
+ private static String bytesToString(ByteBuffer bytes)
+ {
+ return ByteBufferUtil.bytesToHex(bytes);
+ }
+
+ private static ByteBuffer stringToBytes(String string)
+ {
+ return ByteBufferUtil.hexToBytes(string);
+ }
+
+ public Kind getKind()
+ {
+ return kind;
+ }
+
+ public List<String> getParts()
+ {
+ return Collections.unmodifiableList(Arrays.asList(parts));
+ }
+
+ public boolean isFor(TableMetadata metadata)
+ {
+ if (kind != Kind.AUTO_READ)
+ return false;
+ return metadata.keyspace.equals(parts[0])
+ && metadata.name.equals(parts[1]);
+ }
+
+ public DecoratedKey getDecoratedKey(TableMetadata metadata)
+ {
+ checkKind(Kind.AUTO_READ);
+ ByteBuffer data = stringToBytes(parts[2]);
+ return metadata.partitioner.decorateKey(data);
+ }
+
+ public boolean atIndex(int index)
+ {
+ checkKind(Kind.AUTO_READ);
+ return Integer.parseInt(parts[3]) == index;
+ }
+
+ private void checkKind(Kind expected)
+ {
+ if (kind != expected)
+ throw new IllegalStateException("Expected kind " + expected + "
but is " + kind);
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ long size = 0;
+ for (String part : parts)
+ size += part.length();
Review Comment:
should include `String` object overhead right?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -99,6 +110,39 @@ public static long nowInMicros()
return
TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
}
+ public TxnData coordinate(Txn txn)
+ {
+ try
+ {
+ Future<Result> future = node.coordinate(txn);
+ Result result =
future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.SECONDS),
TimeUnit.SECONDS);
Review Comment:
nit: if someone does `500ms` this will be incorrect, we should do
`TimeUnit.NANOSECONDS` to avoid this issue
##########
src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TxnDataName implements Comparable<TxnDataName>
+{
+ private static final TxnDataName RETURNING = new
TxnDataName(Kind.RETURNING);
+
+ public enum Kind
+ {
+ USER((byte) 1),
+ RETURNING((byte) 2),
+ AUTO_READ((byte) 3);
+
+ private final byte value;
+
+ Kind(byte value)
+ {
+ this.value = value;
+ }
+
+ public static Kind from(byte b)
+ {
+ switch (b)
+ {
+ case 1:
+ return USER;
+ case 2:
+ return RETURNING;
+ case 3:
+ return AUTO_READ;
+ default:
+ throw new IllegalArgumentException("Unknown kind: " + b);
+ }
+ }
+ }
+
+ private final Kind kind;
+ private final String[] parts;
+
+ public TxnDataName(Kind kind, String... parts)
+ {
+ this.kind = kind;
+ this.parts = parts;
+ }
+
+ public static TxnDataName user(String name)
+ {
+ return new TxnDataName(Kind.USER, name);
+ }
+
+ public static TxnDataName returning()
+ {
+ return RETURNING;
+ }
+
+ public static TxnDataName partitionRead(TableMetadata metadata,
DecoratedKey key, int index)
+ {
+ return new TxnDataName(Kind.AUTO_READ, metadata.keyspace,
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
+ }
+
+ private static String bytesToString(ByteBuffer bytes)
+ {
+ return ByteBufferUtil.bytesToHex(bytes);
+ }
+
+ private static ByteBuffer stringToBytes(String string)
+ {
+ return ByteBufferUtil.hexToBytes(string);
+ }
Review Comment:
wondering if it would be best to switch from `String[]` to `ByteBuffer[]` as
this adds extra memory cost... this is a hidden internal detail so we can
change w/e we want; just something to think about.
##########
src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TxnDataName implements Comparable<TxnDataName>
+{
+ private static final TxnDataName RETURNING = new
TxnDataName(Kind.RETURNING);
+
+ public enum Kind
+ {
+ USER((byte) 1),
+ RETURNING((byte) 2),
+ AUTO_READ((byte) 3);
+
+ private final byte value;
+
+ Kind(byte value)
+ {
+ this.value = value;
+ }
+
+ public static Kind from(byte b)
+ {
+ switch (b)
+ {
+ case 1:
+ return USER;
+ case 2:
+ return RETURNING;
+ case 3:
+ return AUTO_READ;
+ default:
+ throw new IllegalArgumentException("Unknown kind: " + b);
+ }
+ }
+ }
+
+ private final Kind kind;
+ private final String[] parts;
+
+ public TxnDataName(Kind kind, String... parts)
+ {
+ this.kind = kind;
+ this.parts = parts;
+ }
+
+ public static TxnDataName user(String name)
+ {
+ return new TxnDataName(Kind.USER, name);
+ }
+
+ public static TxnDataName returning()
+ {
+ return RETURNING;
+ }
+
+ public static TxnDataName partitionRead(TableMetadata metadata,
DecoratedKey key, int index)
+ {
+ return new TxnDataName(Kind.AUTO_READ, metadata.keyspace,
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
+ }
+
+ private static String bytesToString(ByteBuffer bytes)
+ {
+ return ByteBufferUtil.bytesToHex(bytes);
+ }
+
+ private static ByteBuffer stringToBytes(String string)
+ {
+ return ByteBufferUtil.hexToBytes(string);
+ }
+
+ public Kind getKind()
+ {
+ return kind;
+ }
+
+ public List<String> getParts()
+ {
+ return Collections.unmodifiableList(Arrays.asList(parts));
+ }
+
+ public boolean isFor(TableMetadata metadata)
+ {
+ if (kind != Kind.AUTO_READ)
+ return false;
+ return metadata.keyspace.equals(parts[0])
+ && metadata.name.equals(parts[1]);
+ }
+
+ public DecoratedKey getDecoratedKey(TableMetadata metadata)
+ {
+ checkKind(Kind.AUTO_READ);
+ ByteBuffer data = stringToBytes(parts[2]);
+ return metadata.partitioner.decorateKey(data);
+ }
+
+ public boolean atIndex(int index)
+ {
+ checkKind(Kind.AUTO_READ);
+ return Integer.parseInt(parts[3]) == index;
+ }
+
+ private void checkKind(Kind expected)
+ {
+ if (kind != expected)
+ throw new IllegalStateException("Expected kind " + expected + "
but is " + kind);
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ long size = 0;
+ for (String part : parts)
+ size += part.length();
+ return size;
+ }
+
+ @Override
+ public int compareTo(TxnDataName o)
+ {
+ int rc = kind.compareTo(o.kind);
+ if (rc != 0)
+ return rc;
+ // same kind has same length
+ int size = parts.length;
+ assert o.parts.length == size : String.format("Expected
other.parts.length == %d but was %d", size, o.parts.length);
+ for (int i = 0; i < size; i++)
+ {
+ rc = parts[i].compareTo(o.parts[i]);
+ if (rc != 0)
+ return rc;
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TxnDataName that = (TxnDataName) o;
+ return kind == that.kind && Arrays.equals(parts, that.parts);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = Objects.hash(kind);
+ result = 31 * result + Arrays.hashCode(parts);
+ return result;
+ }
+
+ public String name()
+ {
+ return String.join(":", parts);
+ }
+
+ @Override
+ public String toString()
+ {
+ return kind.name() + ":" + name();
+ }
+
+ public static final IVersionedSerializer<TxnDataName> serializer = new
IVersionedSerializer<TxnDataName>()
+ {
+ @Override
+ public void serialize(TxnDataName t, DataOutputPlus out, int version)
throws IOException
+ {
+ out.writeByte(t.kind.value);
+ out.writeInt(t.parts.length);
Review Comment:
you have ArraySerializer now, should we delegate to that? that also does
`vint` rather than `int` so may also be smaller?
##########
src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TxnDataName implements Comparable<TxnDataName>
+{
+ private static final TxnDataName RETURNING = new
TxnDataName(Kind.RETURNING);
+
+ public enum Kind
+ {
+ USER((byte) 1),
+ RETURNING((byte) 2),
+ AUTO_READ((byte) 3);
+
+ private final byte value;
+
+ Kind(byte value)
+ {
+ this.value = value;
+ }
+
+ public static Kind from(byte b)
+ {
+ switch (b)
+ {
+ case 1:
+ return USER;
+ case 2:
+ return RETURNING;
+ case 3:
+ return AUTO_READ;
+ default:
+ throw new IllegalArgumentException("Unknown kind: " + b);
+ }
+ }
+ }
+
+ private final Kind kind;
+ private final String[] parts;
+
+ public TxnDataName(Kind kind, String... parts)
+ {
+ this.kind = kind;
+ this.parts = parts;
+ }
+
+ public static TxnDataName user(String name)
+ {
+ return new TxnDataName(Kind.USER, name);
+ }
+
+ public static TxnDataName returning()
+ {
+ return RETURNING;
+ }
+
+ public static TxnDataName partitionRead(TableMetadata metadata,
DecoratedKey key, int index)
Review Comment:
TODO: honestly I don't remember why `int index`...
##########
src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TxnDataName implements Comparable<TxnDataName>
+{
+ private static final TxnDataName RETURNING = new
TxnDataName(Kind.RETURNING);
+
+ public enum Kind
+ {
+ USER((byte) 1),
+ RETURNING((byte) 2),
+ AUTO_READ((byte) 3);
+
+ private final byte value;
+
+ Kind(byte value)
+ {
+ this.value = value;
+ }
+
+ public static Kind from(byte b)
+ {
+ switch (b)
+ {
+ case 1:
+ return USER;
+ case 2:
+ return RETURNING;
+ case 3:
+ return AUTO_READ;
+ default:
+ throw new IllegalArgumentException("Unknown kind: " + b);
+ }
+ }
+ }
+
+ private final Kind kind;
+ private final String[] parts;
+
+ public TxnDataName(Kind kind, String... parts)
+ {
+ this.kind = kind;
+ this.parts = parts;
+ }
+
+ public static TxnDataName user(String name)
+ {
+ return new TxnDataName(Kind.USER, name);
+ }
+
+ public static TxnDataName returning()
+ {
+ return RETURNING;
+ }
+
+ public static TxnDataName partitionRead(TableMetadata metadata,
DecoratedKey key, int index)
+ {
+ return new TxnDataName(Kind.AUTO_READ, metadata.keyspace,
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
+ }
+
+ private static String bytesToString(ByteBuffer bytes)
+ {
+ return ByteBufferUtil.bytesToHex(bytes);
+ }
+
+ private static ByteBuffer stringToBytes(String string)
+ {
+ return ByteBufferUtil.hexToBytes(string);
+ }
+
+ public Kind getKind()
+ {
+ return kind;
+ }
+
+ public List<String> getParts()
+ {
+ return Collections.unmodifiableList(Arrays.asList(parts));
+ }
+
+ public boolean isFor(TableMetadata metadata)
+ {
+ if (kind != Kind.AUTO_READ)
+ return false;
+ return metadata.keyspace.equals(parts[0])
+ && metadata.name.equals(parts[1]);
+ }
+
+ public DecoratedKey getDecoratedKey(TableMetadata metadata)
+ {
+ checkKind(Kind.AUTO_READ);
+ ByteBuffer data = stringToBytes(parts[2]);
+ return metadata.partitioner.decorateKey(data);
+ }
+
+ public boolean atIndex(int index)
+ {
+ checkKind(Kind.AUTO_READ);
+ return Integer.parseInt(parts[3]) == index;
+ }
+
+ private void checkKind(Kind expected)
+ {
+ if (kind != expected)
+ throw new IllegalStateException("Expected kind " + expected + "
but is " + kind);
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ long size = 0;
Review Comment:
we should include `EMPTY` that we learn from JAMM...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]