dcapwell commented on code in PR #1962:
URL: https://github.com/apache/cassandra/pull/1962#discussion_r1039935959


##########
src/java/org/apache/cassandra/serializers/ListSerializer.java:
##########
@@ -224,10 +226,29 @@ public Class<List<T>> getType()
     }
 
     @Override
-    public ByteBuffer getSerializedValue(ByteBuffer collection, ByteBuffer 
key, AbstractType<?> comparator)
+    public ByteBuffer getSerializedValue(ByteBuffer collection, ByteBuffer 
index, AbstractType<?> comparator)
     {
-        // We don't allow selecting an element of a list, so we don't need 
this.
-        throw new UnsupportedOperationException();
+        try
+        {
+            int n = readCollectionSize(collection, 
ByteBufferAccessor.instance, ProtocolVersion.V3);

Review Comment:
   currently this isn't an issue, but hard coding `V3` feels buggy... if we get 
a v6 with changes we then try to read as v3?  Don't we need to know how it was 
written?



##########
src/java/org/apache/cassandra/serializers/ListSerializer.java:
##########
@@ -224,10 +226,29 @@ public Class<List<T>> getType()
     }
 
     @Override
-    public ByteBuffer getSerializedValue(ByteBuffer collection, ByteBuffer 
key, AbstractType<?> comparator)
+    public ByteBuffer getSerializedValue(ByteBuffer collection, ByteBuffer 
index, AbstractType<?> comparator)
     {
-        // We don't allow selecting an element of a list, so we don't need 
this.
-        throw new UnsupportedOperationException();
+        try
+        {
+            int n = readCollectionSize(collection, 
ByteBufferAccessor.instance, ProtocolVersion.V3);
+            int offset = sizeOfCollectionSize(n, ProtocolVersion.V3);

Review Comment:
   for readability a doc saying why is good, as this method I think causes more 
confusion (at least it did for me)
   
   ```
   // the offset starts after the size, which is after the sizeOf the 
collection size
   ```



##########
src/java/org/apache/cassandra/service/accord/AccordSerializers.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.LIST;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.SET;
+
+public class AccordSerializers
+{
+    public static <T> ByteBuffer serialize(T item, IVersionedSerializer<T> 
serializer)
+    {
+        int version = MessagingService.current_version;
+        long size = serializer.serializedSize(item, version) + 
sizeofUnsignedVInt(version);
+        try (DataOutputBuffer out = new DataOutputBuffer((int) size))
+        {
+            out.writeUnsignedVInt(version);
+            serializer.serialize(item, out, version);
+            return out.buffer(false);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> 
serializer)
+    {
+        try (DataInputBuffer in = new DataInputBuffer(bytes, true))
+        {
+            int version = checkedCast(in.readUnsignedVInt());
+            return serializer.deserialize(in, version);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Term.Terminal deserializeCqlCollectionAsTerm(ByteBuffer 
buffer, AbstractType<?> type)
+    {
+        CollectionType<?> collectionType = (CollectionType<?>) type;
+
+        if (collectionType.kind == SET)
+            return Sets.Value.fromSerialized(buffer, (SetType<?>) type, 
ProtocolVersion.CURRENT);
+        else if (collectionType.kind == LIST)
+            return Lists.Value.fromSerialized(buffer, (ListType<?>) type, 
ProtocolVersion.CURRENT);
+        else if (collectionType.kind == MAP)
+            return Maps.Value.fromSerialized(buffer, (MapType<?, ?>) type, 
ProtocolVersion.CURRENT);
+
+        throw new UnsupportedOperationException("Unsupported collection type: 
" + type);
+    }
+
+    public static final IVersionedSerializer<PartitionUpdate> 
partitionUpdateSerializer = new IVersionedSerializer<PartitionUpdate>()
+    {
+        @Override
+        public void serialize(PartitionUpdate upd, DataOutputPlus out, int 
version) throws IOException
+        {
+            PartitionUpdate.serializer.serialize(upd, out, version);
+        }
+
+        @Override
+        public PartitionUpdate deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            return PartitionUpdate.serializer.deserialize(in, version, 
DeserializationHelper.Flag.FROM_REMOTE);
+        }
+
+        @Override
+        public long serializedSize(PartitionUpdate upd, int version)
+        {
+            return PartitionUpdate.serializer.serializedSize(upd, version);
+        }
+    };
+
+    public static final IVersionedSerializer<SinglePartitionReadCommand> 
singlePartitionReadCommandSerializer = new 
IVersionedSerializer<SinglePartitionReadCommand>()
+    {
+        @Override
+        public void serialize(SinglePartitionReadCommand command, 
DataOutputPlus out, int version) throws IOException
+        {
+            SinglePartitionReadCommand.serializer.serialize(command, out, 
version);
+        }
+
+        @Override
+        public SinglePartitionReadCommand deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            return (SinglePartitionReadCommand) 
SinglePartitionReadCommand.serializer.deserialize(in, version);
+        }
+
+        @Override
+        public long serializedSize(SinglePartitionReadCommand command, int 
version)
+        {
+            return 
SinglePartitionReadCommand.serializer.serializedSize(command, version);
+        }
+    };
+
+    public static final IVersionedSerializer<ColumnMetadata> 
columnMetadataSerializer = new IVersionedSerializer<ColumnMetadata>()

Review Comment:
   should this move to `ColumnMetadata`?



##########
src/java/org/apache/cassandra/service/accord/AccordSerializers.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.LIST;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.SET;
+
+public class AccordSerializers
+{
+    public static <T> ByteBuffer serialize(T item, IVersionedSerializer<T> 
serializer)
+    {
+        int version = MessagingService.current_version;
+        long size = serializer.serializedSize(item, version) + 
sizeofUnsignedVInt(version);
+        try (DataOutputBuffer out = new DataOutputBuffer((int) size))
+        {
+            out.writeUnsignedVInt(version);
+            serializer.serialize(item, out, version);
+            return out.buffer(false);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> 
serializer)
+    {
+        try (DataInputBuffer in = new DataInputBuffer(bytes, true))
+        {
+            int version = checkedCast(in.readUnsignedVInt());
+            return serializer.deserialize(in, version);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Term.Terminal deserializeCqlCollectionAsTerm(ByteBuffer 
buffer, AbstractType<?> type)
+    {
+        CollectionType<?> collectionType = (CollectionType<?>) type;
+
+        if (collectionType.kind == SET)
+            return Sets.Value.fromSerialized(buffer, (SetType<?>) type, 
ProtocolVersion.CURRENT);
+        else if (collectionType.kind == LIST)
+            return Lists.Value.fromSerialized(buffer, (ListType<?>) type, 
ProtocolVersion.CURRENT);
+        else if (collectionType.kind == MAP)
+            return Maps.Value.fromSerialized(buffer, (MapType<?, ?>) type, 
ProtocolVersion.CURRENT);
+
+        throw new UnsupportedOperationException("Unsupported collection type: 
" + type);
+    }
+
+    public static final IVersionedSerializer<PartitionUpdate> 
partitionUpdateSerializer = new IVersionedSerializer<PartitionUpdate>()
+    {
+        @Override
+        public void serialize(PartitionUpdate upd, DataOutputPlus out, int 
version) throws IOException
+        {
+            PartitionUpdate.serializer.serialize(upd, out, version);
+        }
+
+        @Override
+        public PartitionUpdate deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            return PartitionUpdate.serializer.deserialize(in, version, 
DeserializationHelper.Flag.FROM_REMOTE);
+        }
+
+        @Override
+        public long serializedSize(PartitionUpdate upd, int version)
+        {
+            return PartitionUpdate.serializer.serializedSize(upd, version);
+        }
+    };
+
+    public static final IVersionedSerializer<SinglePartitionReadCommand> 
singlePartitionReadCommandSerializer = new 
IVersionedSerializer<SinglePartitionReadCommand>()
+    {
+        @Override
+        public void serialize(SinglePartitionReadCommand command, 
DataOutputPlus out, int version) throws IOException
+        {
+            SinglePartitionReadCommand.serializer.serialize(command, out, 
version);
+        }
+
+        @Override
+        public SinglePartitionReadCommand deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            return (SinglePartitionReadCommand) 
SinglePartitionReadCommand.serializer.deserialize(in, version);
+        }
+
+        @Override
+        public long serializedSize(SinglePartitionReadCommand command, int 
version)
+        {
+            return 
SinglePartitionReadCommand.serializer.serializedSize(command, version);
+        }
+    };

Review Comment:
   doesn't look needed as `org.apache.cassandra.db.ReadCommand#serializer` 
already impl `IVersionedSerializer`; only difference is deserialize that 
returns `ReadCommand` vs casting to `SinglePartitionReadCommand`



##########
src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+public class AccordUpdateParameters
+{
+    private final TxnData data;
+    private final QueryOptions options;
+
+    public AccordUpdateParameters(TxnData data, QueryOptions options)
+    {
+        this.data = data;
+        this.options = options;
+    }
+
+    public TxnData getData()
+    {
+        return data;
+    }
+
+    public UpdateParameters updateParameters(TableMetadata metadata, int 
rowIndex)
+    {
+        // This is currently only used by Guardrails, but this logically have 
issues with Accord as drifts in config
+        // values could cause unexpected issues in Accord. (ex. some nodes 
reject writes while others accept)
+        // For the time being, guardrails are disabled for Accord queries.
+        ClientState disabledGuardrails = null;
+
+        // What we use here doesn't matter as they get replaced before 
actually performing the write.
+        // see org.apache.cassandra.service.accord.txn.TxnWrite.Update.write
+        int nowInSeconds = 42;
+        long timestamp = nowInSeconds;
+
+        // TODO: How should Accord work with TTL?
+        int ttl = metadata.params.defaultTimeToLive;
+        return new UpdateParameters(metadata,
+                                    disabledGuardrails,
+                                    options,
+                                    timestamp,
+                                    nowInSeconds,
+                                    ttl,
+                                    prefetchRow(metadata, rowIndex));
+    }
+
+    private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, 
int index)
+    {
+        for (Map.Entry<TxnDataName, FilteredPartition> e : data.entrySet())

Review Comment:
   Should prob just read `AUTO_READ` rather than rely on `isFor` to filter



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Throwables;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import accord.coordinate.Preempted;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.TransactionStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.utils.FailingConsumer;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertArrayEquals;
+
+public abstract class AccordTestBase extends TestBaseImpl
+{
+    protected static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+    protected static Cluster sharedCluster;

Review Comment:
   in java all statics are cap; we don't override this in C* (though we are not 
consistent in C* on this)



##########
src/java/org/apache/cassandra/service/StorageServiceMBean.java:
##########
@@ -630,6 +632,9 @@ default int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion,
     public void setTruncateRpcTimeout(long value);
     public long getTruncateRpcTimeout();
 
+    public void setTransactionTimeout(long value);
+    public long getTransactionTimeout();

Review Comment:
   should use `String`



##########
src/java/org/apache/cassandra/utils/ByteBufferUtil.java:
##########
@@ -533,6 +551,35 @@ else if (obj instanceof byte[])
             return ByteBuffer.wrap((byte[]) obj);
         else if (obj instanceof ByteBuffer)
             return (ByteBuffer) obj;
+        else if (obj instanceof List)
+        {
+            List<?> list = (List<?>) obj;
+            // convert subtypes to BB
+            List<ByteBuffer> bbs = 
list.stream().map(ByteBufferUtil::objectToBytes).collect(Collectors.toList());
+            // decompose/serializer doesn't use the isMultiCell, so safe to do 
this
+            return ListType.getInstance(BytesType.instance, 
false).decompose(bbs);
+        }
+        else if (obj instanceof Map)
+        {
+            Map<?, ?> map = (Map<?, ?>) obj;
+            // convert subtypes to BB
+            Map<ByteBuffer, ByteBuffer> bbs = new LinkedHashMap<>();
+            for (Map.Entry<?, ?> e : map.entrySet())
+                bbs.put(objectToBytes(e.getKey()), 
objectToBytes(e.getValue()));

Review Comment:
   yep, that's about it; we know `key` .equals/.hashCode don't conflict but the 
serialized version "could" invalidate that, so the check you added would detect 
that case.



##########
src/java/org/apache/cassandra/service/accord/txn/AbstractKeySorted.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+
+import accord.primitives.Keys;
+import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+
+/**
+ * Immutable collection of items, sorted first by their partition key
+ */
+public abstract class AbstractKeySorted<T> implements Iterable<T>
+{
+    public static final String ITEMS_OUT_OF_ORDER_MESSAGE = "Items are out of 
order ([%s] %s >= [%s] %s)";
+
+    protected final Keys itemKeys;
+    protected final T[] items;
+
+    // items are expected to be sorted
+    public AbstractKeySorted(T[] items)
+    {
+        this.items = items;
+        this.itemKeys = extractItemKeys();
+    }
+
+    public AbstractKeySorted(List<T> items)
+    {
+        T[] arr = newArray(items.size());
+        items.toArray(arr);
+        Arrays.sort(arr, this::compare);
+        this.items = arr;
+        this.itemKeys = extractItemKeys();
+    }
+
+    private Keys extractItemKeys()
+    {
+        SortedSet<PartitionKey> keysSet = new 
TreeSet<>(AccordRoutingKey::compareKeys);
+        forEach(i -> keysSet.add(getKey(i)));
+        return new Keys(keysSet);
+    }
+
+    @Override
+    public Iterator<T> iterator()
+    {
+        return Iterators.forArray(items);
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getSimpleName() + Arrays.stream(items)
+                                                  .map(Objects::toString)
+                                                  
.collect(Collectors.joining(", ", "{", "}"));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AbstractKeySorted<?> that = (AbstractKeySorted<?>) o;
+        return Arrays.equals(items, that.items);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.hashCode(items);
+    }
+
+    @VisibleForTesting
+    public Keys keys()
+    {
+        return itemKeys;
+    }
+
+    /**
+     * Compare the non-key component of items (since this class handles 
sorting by key)
+     */
+    abstract int compareNonKeyFields(T left, T right);
+
+    abstract PartitionKey getKey(T item);
+    abstract T[] newArray(int size);
+
+    private int compare(T left, T right)
+    {
+        int cmp = getKey(left).compareTo(getKey(right));
+        return cmp != 0 ? cmp : compareNonKeyFields(left, right);
+    }
+
+    @VisibleForTesting
+    void validateOrder()
+    {
+        for (int i=1; i< items.length; i++)
+        {
+            T prev = items[i-1];
+            T next = items[i];
+            if (getKey(prev).compareTo(getKey(next)) > 0)
+                throw new 
IllegalStateException(String.format(ITEMS_OUT_OF_ORDER_MESSAGE, i-1, prev, i, 
next));
+            if (compare(prev, next) >= 0)
+                throw new IllegalStateException(String.format("Items are out 
of order ([%s] %s >= [%s] %s)", i-1, prev, i, next));
+        }
+    }
+
+    public int size()
+    {
+        return items.length;
+    }
+
+    public void forEachWithKey(PartitionKey key, Consumer<T> consumer)
+    {
+        for (int i = firstPossibleKeyIdx(key); i < items.length && 
getKey(items[i]).equals(key); i++)
+            consumer.accept(items[i]);
+    }
+
+    private int firstPossibleKeyIdx(PartitionKey key)
+    {
+        int idx = Arrays.binarySearch(items, key, (l, r) -> {
+            PartitionKey lk = getKey((T) l);
+            PartitionKey rk = (PartitionKey) r;

Review Comment:
   I had to read the code as this felt wrong... `r` is `key`...



##########
src/java/org/apache/cassandra/serializers/ListSerializer.java:
##########
@@ -224,10 +226,29 @@ public Class<List<T>> getType()
     }
 
     @Override
-    public ByteBuffer getSerializedValue(ByteBuffer collection, ByteBuffer 
key, AbstractType<?> comparator)
+    public ByteBuffer getSerializedValue(ByteBuffer collection, ByteBuffer 
index, AbstractType<?> comparator)
     {
-        // We don't allow selecting an element of a list, so we don't need 
this.
-        throw new UnsupportedOperationException();
+        try
+        {
+            int n = readCollectionSize(collection, 
ByteBufferAccessor.instance, ProtocolVersion.V3);
+            int offset = sizeOfCollectionSize(n, ProtocolVersion.V3);
+            int idx = ByteBufferUtil.toInt(index);
+
+            Preconditions.checkElementIndex(idx, n);
+
+            for (int i = 0; i <= idx; i++)
+            {
+                ByteBuffer value = readValue(collection, 
ByteBufferAccessor.instance, offset, ProtocolVersion.V3);

Review Comment:
   pushed a change to avoid alloc and use `skip`; see 
ef3e2f0ea9166011c6e3bba0aa41cf599bbc0842



##########
src/java/org/apache/cassandra/service/accord/txn/AbstractKeySorted.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+
+import accord.primitives.Keys;
+import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+
+/**
+ * Immutable collection of items, sorted first by their partition key
+ */
+public abstract class AbstractKeySorted<T> implements Iterable<T>
+{
+    public static final String ITEMS_OUT_OF_ORDER_MESSAGE = "Items are out of 
order ([%s] %s >= [%s] %s)";
+
+    protected final Keys itemKeys;
+    protected final T[] items;
+
+    // items are expected to be sorted
+    public AbstractKeySorted(T[] items)

Review Comment:
   unsafe, we don't know this is sorted; can call ``validateOrder or just sort 
like we do for `List`



##########
src/java/org/apache/cassandra/service/accord/AccordSerializers.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.LIST;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.SET;
+
+public class AccordSerializers
+{
+    public static <T> ByteBuffer serialize(T item, IVersionedSerializer<T> 
serializer)
+    {
+        int version = MessagingService.current_version;
+        long size = serializer.serializedSize(item, version) + 
sizeofUnsignedVInt(version);
+        try (DataOutputBuffer out = new DataOutputBuffer((int) size))
+        {
+            out.writeUnsignedVInt(version);
+            serializer.serialize(item, out, version);
+            return out.buffer(false);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> 
serializer)
+    {
+        try (DataInputBuffer in = new DataInputBuffer(bytes, true))
+        {
+            int version = checkedCast(in.readUnsignedVInt());
+            return serializer.deserialize(in, version);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Term.Terminal deserializeCqlCollectionAsTerm(ByteBuffer 
buffer, AbstractType<?> type)
+    {
+        CollectionType<?> collectionType = (CollectionType<?>) type;
+
+        if (collectionType.kind == SET)
+            return Sets.Value.fromSerialized(buffer, (SetType<?>) type, 
ProtocolVersion.CURRENT);
+        else if (collectionType.kind == LIST)
+            return Lists.Value.fromSerialized(buffer, (ListType<?>) type, 
ProtocolVersion.CURRENT);
+        else if (collectionType.kind == MAP)
+            return Maps.Value.fromSerialized(buffer, (MapType<?, ?>) type, 
ProtocolVersion.CURRENT);
+

Review Comment:
   shouldn't hard code the version, this could bite us in the future.



##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1192,46 +1192,52 @@ public static void applySeedProvider()
     @VisibleForTesting
     static void checkForLowestAcceptedTimeouts(Config conf)
     {
-        if(conf.read_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.read_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("read_request_timeout", conf.read_request_timeout, 
LOWEST_ACCEPTED_TIMEOUT);
             conf.read_request_timeout = new 
DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.range_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.range_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("range_request_timeout", conf.range_request_timeout, 
LOWEST_ACCEPTED_TIMEOUT);
             conf.range_request_timeout = new 
DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("request_timeout", conf.request_timeout, 
LOWEST_ACCEPTED_TIMEOUT);
             conf.request_timeout = new 
DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.write_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.write_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("write_request_timeout", conf.write_request_timeout, 
LOWEST_ACCEPTED_TIMEOUT);
             conf.write_request_timeout = new 
DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.cas_contention_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.cas_contention_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("cas_contention_timeout", conf.cas_contention_timeout, 
LOWEST_ACCEPTED_TIMEOUT);
             conf.cas_contention_timeout = new 
DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.counter_write_request_timeout.toMilliseconds()< 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.counter_write_request_timeout.toMilliseconds()< 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("counter_write_request_timeout", 
conf.counter_write_request_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.counter_write_request_timeout = new 
DurationSpec.LongMillisecondsBound("10ms");
         }
-        if(conf.truncate_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.truncate_request_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("truncate_request_timeout", conf.truncate_request_timeout, 
LOWEST_ACCEPTED_TIMEOUT);
             conf.truncate_request_timeout = LOWEST_ACCEPTED_TIMEOUT;
         }
+
+        if (conf.transaction_timeout.toMilliseconds() < 
LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        {
+            logInfo("transaction_timeout", conf.transaction_timeout, 
LOWEST_ACCEPTED_TIMEOUT);
+            conf.transaction_timeout = LOWEST_ACCEPTED_TIMEOUT;
+        }

Review Comment:
   never saw this before... :sigh:   this is hard to deal with with DD as we 
need to duplicate the effort for non-yaml paths!  =(
   
   Your patch is fine; not asking you to do anything... just sad



##########
src/java/org/apache/cassandra/service/StorageService.java:
##########
@@ -1522,6 +1587,17 @@ public long getTruncateRpcTimeout()
         return DatabaseDescriptor.getTruncateRpcTimeout(MILLISECONDS);
     }
 
+    public void setTransactionTimeout(long value)
+    {
+        DatabaseDescriptor.setTransactionTimeout(value);
+        logger.info("set transaction timeout to {} ms", value);
+    }
+
+    public long getTransactionTimeout()
+    {
+        return DatabaseDescriptor.getTransactionTimeout(MILLISECONDS);
+    }

Review Comment:
   we should use `String` when talking to users as that's the API they know; we 
did this for most of the other configs as well.



##########
src/java/org/apache/cassandra/service/accord/txn/TxnAppliedQuery.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+import accord.api.Data;
+import accord.api.Query;
+import accord.api.Read;
+import accord.api.Result;
+import accord.api.Update;
+import accord.primitives.TxnId;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+
+import static 
org.apache.cassandra.service.accord.AccordSerializers.deserialize;
+import static org.apache.cassandra.service.accord.AccordSerializers.serialize;
+
+// TODO: This is currently unused, but we might want to use it to support 
returning the condition result.
+public class TxnAppliedQuery implements Query
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
TxnAppliedQuery(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+    public static class Applied implements Result
+    {
+        public static final Applied TRUE = new Applied();
+        public static final Applied FALSE = new Applied();
+        private static final long SIZE = ObjectSizes.measure(TRUE);
+
+        private Applied() {}
+
+        public boolean wasApplied()
+        {
+            return this == TRUE;
+        }
+
+        public static Applied valueOf(boolean b)
+        {
+            return b ? TRUE : FALSE;
+        }
+
+        public long estimatedSizeOnHeap()
+        {
+            return SIZE;
+        }
+
+        public static final IVersionedSerializer<Applied> serializer = new 
IVersionedSerializer<Applied>()
+        {
+            @Override
+            public void serialize(Applied applied, DataOutputPlus out, int 
version) throws IOException
+            {
+                out.writeBoolean(applied.wasApplied());
+            }
+
+            @Override
+            public Applied deserialize(DataInputPlus in, int version) throws 
IOException
+            {
+                return Applied.valueOf(in.readBoolean());
+            }
+
+            @Override
+            public long serializedSize(Applied applied, int version)
+            {
+                return TypeSizes.BOOL_SIZE;
+            }
+        };
+    }
+    private final ByteBuffer serializedCondition;
+
+    public TxnAppliedQuery(TxnCondition condition)
+    {
+        this.serializedCondition = serialize(condition, 
TxnCondition.serializer);
+    }
+
+    public TxnAppliedQuery(ByteBuffer serializedCondition)
+    {
+        this.serializedCondition = serializedCondition;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TxnAppliedQuery query = (TxnAppliedQuery) o;
+        return Objects.equals(serializedCondition, query.serializedCondition);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(serializedCondition);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "TxnAppliedQuery{serializedCondition=" + 
deserialize(serializedCondition, TxnCondition.serializer) + '}';
+    }
+
+    public long estimatedSizeOnHeap()
+    {
+        return EMPTY_SIZE + 
ByteBufferUtil.estimatedSizeOnHeap(serializedCondition);
+    }
+
+    @Override
+    public Result compute(TxnId txnId, Data data, @Nullable Read read, 
@Nullable Update update)
+    {
+        TxnCondition condition = deserialize(serializedCondition, 
TxnCondition.serializer);
+        return condition.applies((TxnData) data) ? Applied.TRUE : 
Applied.FALSE;

Review Comment:
   can the `Update` add the result?  If we will include in the returned 
`TxnData` then we could add a new TxnDataName.CONDITION_MET or something like 
that...  only concern with that is what happens when we add multi condition 
support



##########
src/java/org/apache/cassandra/utils/ArraySerializers.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.util.function.IntFunction;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
+public class ArraySerializers
+{
+    public static <T> void serializeArray(T[] items, DataOutputPlus out, int 
version, IVersionedSerializer<T> serializer) throws IOException
+    {
+        out.writeUnsignedVInt(items.length);
+        for (T item : items)
+            serializer.serialize(item, out, version);
+    }
+
+    public static <T> T[] deserializeArray(DataInputPlus in, int version, 
IVersionedSerializer<T> serializer, IntFunction<T[]> arrayFactory) throws 
IOException
+    {
+        int size = checkedCast(in.readUnsignedVInt());

Review Comment:
   we use `int32` in other contexts, maybe `readUnsignedVInt32`?



##########
src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+public class AccordUpdateParameters
+{
+    private final TxnData data;
+    private final QueryOptions options;
+
+    public AccordUpdateParameters(TxnData data, QueryOptions options)
+    {
+        this.data = data;
+        this.options = options;
+    }
+
+    public TxnData getData()
+    {
+        return data;
+    }
+
+    public UpdateParameters updateParameters(TableMetadata metadata, int 
rowIndex)
+    {
+        // This is currently only used by Guardrails, but this logically have 
issues with Accord as drifts in config
+        // values could cause unexpected issues in Accord. (ex. some nodes 
reject writes while others accept)
+        // For the time being, guardrails are disabled for Accord queries.
+        ClientState disabledGuardrails = null;
+
+        // What we use here doesn't matter as they get replaced before 
actually performing the write.
+        // see org.apache.cassandra.service.accord.txn.TxnWrite.Update.write
+        int nowInSeconds = 42;
+        long timestamp = nowInSeconds;
+
+        // TODO: How should Accord work with TTL?
+        int ttl = metadata.params.defaultTimeToLive;
+        return new UpdateParameters(metadata,
+                                    disabledGuardrails,
+                                    options,
+                                    timestamp,
+                                    nowInSeconds,
+                                    ttl,
+                                    prefetchRow(metadata, rowIndex));
+    }
+
+    private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, 
int index)

Review Comment:
   `index` is the `write` index, so this implies that our `AUTO_READ` are 
`table + write_ index`?  This can be a problem if you do an update to the same 
partition multiple times; we could read once but read multiple
   
   ```
   -- does this really read `pk=0` twice?
   UPDATE table SET a += 1 WHERE pk=0;
   UPDATE table SET b -= 1 WHERE pk=0;
   ```



##########
src/java/org/apache/cassandra/utils/ArraySerializers.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.util.function.IntFunction;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
+public class ArraySerializers
+{
+    public static <T> void serializeArray(T[] items, DataOutputPlus out, int 
version, IVersionedSerializer<T> serializer) throws IOException
+    {
+        out.writeUnsignedVInt(items.length);
+        for (T item : items)
+            serializer.serialize(item, out, version);
+    }
+
+    public static <T> T[] deserializeArray(DataInputPlus in, int version, 
IVersionedSerializer<T> serializer, IntFunction<T[]> arrayFactory) throws 
IOException
+    {
+        int size = checkedCast(in.readUnsignedVInt());

Review Comment:
   > Do we want to audit/change all of those in this PR
   
   To avoid conflicts with rebasing on trunk im in favor of only updating what 
accord touches



##########
src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class TxnDataName implements Comparable<TxnDataName>
+{
+    private static final TxnDataName RETURNING = new 
TxnDataName(Kind.RETURNING);
+
+    public enum Kind
+    {
+        USER((byte) 1),
+        RETURNING((byte) 2),
+        AUTO_READ((byte) 3);
+
+        private final byte value;
+
+        Kind(byte value)
+        {
+            this.value = value;
+        }
+
+        public static Kind from(byte b)
+        {
+            switch (b)
+            {
+                case 1:
+                    return USER;
+                case 2:
+                    return RETURNING;
+                case 3:
+                    return AUTO_READ;
+                default:
+                    throw new IllegalArgumentException("Unknown kind: " + b);
+            }
+        }
+    }
+
+    private final Kind kind;
+    private final String[] parts;
+
+    public TxnDataName(Kind kind, String... parts)
+    {
+        this.kind = kind;
+        this.parts = parts;
+    }
+
+    public static TxnDataName user(String name)
+    {
+        return new TxnDataName(Kind.USER, name);
+    }
+
+    public static TxnDataName returning()
+    {
+        return RETURNING;
+    }
+
+    public static TxnDataName partitionRead(TableMetadata metadata, 
DecoratedKey key, int index)
+    {
+        return new TxnDataName(Kind.AUTO_READ, metadata.keyspace, 
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
+    }
+
+    private static String bytesToString(ByteBuffer bytes)
+    {
+        return ByteBufferUtil.bytesToHex(bytes);
+    }
+
+    private static ByteBuffer stringToBytes(String string)
+    {
+        return ByteBufferUtil.hexToBytes(string);
+    }

Review Comment:
   we need to serialize, so we need to make sure we can send cross nodes; 
`Object[]` could work, but serializer would be more annoying
   
   cool with that if you feel best



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to