dcapwell commented on code in PR #1962:
URL: https://github.com/apache/cassandra/pull/1962#discussion_r1042737283


##########
src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.transactions;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+
+public class ConditionStatement
+{
+    public enum Kind
+    {
+        IS_NOT_NULL(TxnCondition.Kind.IS_NOT_NULL),
+        IS_NULL(TxnCondition.Kind.IS_NULL),
+        EQ(TxnCondition.Kind.EQUAL),
+        NEQ(TxnCondition.Kind.NOT_EQUAL),
+        GT(TxnCondition.Kind.GREATER_THAN),
+        GTE(TxnCondition.Kind.GREATER_THAN_OR_EQUAL),
+        LT(TxnCondition.Kind.LESS_THAN),
+        LTE(TxnCondition.Kind.LESS_THAN_OR_EQUAL);
+        
+        // TODO: Support for IN, CONTAINS, CONTAINS KEY
+
+        private final TxnCondition.Kind kind;
+        
+        Kind(TxnCondition.Kind kind)
+        {
+            this.kind = kind;
+        }
+
+        TxnCondition.Kind toTxnKind()
+        {
+            return kind;
+        }
+    }
+
+    private final RowDataReference reference;
+    private final Kind kind;
+    private final Term value;
+
+    public ConditionStatement(RowDataReference reference, Kind kind, Term 
value)
+    {
+        this.reference = reference;
+        this.kind = kind;
+        this.value = value;
+    }
+
+    public static class Raw
+    {
+        private final RowDataReference.Raw reference;
+        private final Kind kind;
+        private final Term.Raw value;
+
+        public Raw(RowDataReference.Raw reference, Kind kind, Term.Raw value)
+        {
+            Preconditions.checkArgument(reference != null);
+            Preconditions.checkArgument((value == null) == (kind == 
Kind.IS_NOT_NULL || kind == Kind.IS_NULL));
+            this.reference = reference;
+            this.kind = kind;
+            this.value = value;
+        }
+
+        public ConditionStatement prepare(String keyspace, 
VariableSpecifications bindVariables)
+        {
+            RowDataReference preparedReference = reference.prepareAsReceiver();
+            preparedReference.collectMarkerSpecification(bindVariables);
+            Term preparedValue = null;
+
+            if (value != null)
+            {
+                ColumnSpecification receiver = preparedReference.column();
+                
+                if (preparedReference.isElementSelection())
+                {
+                    switch (((CollectionType<?>) receiver.type).kind)
+                    {
+                        case LIST:
+                            receiver = Lists.valueSpecOf(receiver);
+                            break;
+                        case MAP:
+                            receiver = Maps.valueSpecOf(receiver);
+                            break;
+                        case SET:
+                            throw new 
InvalidRequestException(String.format("Invalid operation %s = %s for set column 
%s",

Review Comment:
   should have a default?  this is exhaustive but things can always change



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Txn;
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.cql3.transactions.RowDataReference;
+import org.apache.cassandra.cql3.transactions.ConditionStatement;
+import org.apache.cassandra.cql3.transactions.ReferenceOperation;
+import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LazyToString;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public class TransactionStatement implements CQLStatement
+{
+    public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PRIMARY_KEY_LET_MESSAGE = "SELECT in 
LET assignment without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "Normal 
SELECT without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions.";
+    public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps.";
+    public static final String EMPTY_TRANSACTION_MESSAGE = "Transaction 
contains no reads or writes";
+    public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
+
+    static class NamedSelect
+    {
+        final TxnDataName name;
+        final SelectStatement select;
+
+        public NamedSelect(TxnDataName name, SelectStatement select)
+        {
+            this.name = name;
+            this.select = select;
+        }
+    }
+
+    private final List<NamedSelect> assignments;
+    private final NamedSelect returningSelect;
+    private final List<RowDataReference> returningReferences;
+    private final List<ModificationStatement> updates;
+    private final List<ConditionStatement> conditions;
+
+    private final VariableSpecifications bindVariables;
+
+    private final Map<TxnDataName, NamedSelect> autoReads = new HashMap<>();
+
+    public TransactionStatement(List<NamedSelect> assignments,
+                                NamedSelect returningSelect,
+                                List<RowDataReference> returningReferences,
+                                List<ModificationStatement> updates,
+                                List<ConditionStatement> conditions,
+                                VariableSpecifications bindVariables)
+    {
+        this.assignments = assignments;
+        this.returningSelect = returningSelect;
+        this.returningReferences = returningReferences;
+        this.updates = updates;
+        this.conditions = conditions;
+        this.bindVariables = bindVariables;
+    }
+
+    @Override
+    public List<ColumnSpecification> getBindVariables()
+    {
+        return bindVariables.getBindVariables();
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        // Assess read permissions for all data from both explicit LET 
statements and generated reads.
+        for (NamedSelect let : assignments)
+            let.select.authorize(state);
+
+        if (returningSelect != null)
+            returningSelect.select.authorize(state);
+
+        for (ModificationStatement update : updates)
+            update.authorize(state);
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        for (ModificationStatement statement : updates)
+            statement.validate(state);
+    }
+
+    @VisibleForTesting
+    public List<RowDataReference> getReturningReferences()

Review Comment:
   this is dead code



##########
src/java/org/apache/cassandra/service/accord/txn/TxnAppliedQuery.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+import accord.api.Data;
+import accord.api.Query;
+import accord.api.Read;
+import accord.api.Result;
+import accord.api.Update;
+import accord.primitives.TxnId;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+
+import static 
org.apache.cassandra.service.accord.AccordSerializers.deserialize;
+import static org.apache.cassandra.service.accord.AccordSerializers.serialize;
+
+// TODO: This is currently unused, but we might want to use it to support 
returning the condition result.
+public class TxnAppliedQuery implements Query

Review Comment:
   should we remove as Ariel has his own version?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java:
##########
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.conditions.ColumnCondition;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static 
org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
+
+public abstract class TxnCondition
+{
+    private interface ConditionSerializer<T extends TxnCondition>
+    {
+        void serialize(T condition, DataOutputPlus out, int version) throws 
IOException;
+        T deserialize(DataInputPlus in, int version, Kind kind) throws 
IOException;
+        long serializedSize(T condition, int version);
+    }
+
+    public enum Kind
+    {
+        NONE("n/a", null),
+        AND("AND", null),
+        OR("OR", null),
+        IS_NOT_NULL("IS NOT NULL", null),
+        IS_NULL("IS NULL", null),
+        EQUAL("=", Operator.EQ),
+        NOT_EQUAL("!=", Operator.NEQ),
+        GREATER_THAN(">", Operator.GT),
+        GREATER_THAN_OR_EQUAL(">=", Operator.GTE),
+        LESS_THAN("<", Operator.LT),
+        LESS_THAN_OR_EQUAL("<=", Operator.LTE);
+
+        private final String symbol;
+        private final Operator operator;
+
+        Kind(String symbol, Operator operator)
+        {
+            this.symbol = symbol;
+            this.operator = operator;
+        }
+
+        @SuppressWarnings("rawtypes")
+        private ConditionSerializer serializer()
+        {
+            switch (this)
+            {
+                case IS_NOT_NULL:
+                case IS_NULL:
+                    return Exists.serializer;
+                case EQUAL:
+                case NOT_EQUAL:
+                case LESS_THAN:
+                case LESS_THAN_OR_EQUAL:
+                case GREATER_THAN:
+                case GREATER_THAN_OR_EQUAL:
+                    return Value.serializer;
+                case AND:
+                case OR:
+                    return BooleanGroup.serializer;
+                case NONE:
+                    return None.serializer;
+                default:
+                    throw new IllegalArgumentException();

Review Comment:
   useful message plz!



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.AccordSerializers;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+
+public class TxnReferenceOperation
+{
+    private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
+    
+    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+    {
+        Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Lists.Appender.class, Kind.Appender);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.Prepender.class, Kind.Prepender);
+        temp.put(Maps.Putter.class, Kind.Putter);
+        temp.put(Lists.Setter.class, Kind.ListSetter);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Maps.Setter.class, Kind.MapSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.Subtracter);
+        temp.put(Maps.SetterByKey.class, Kind.SetterByKey);
+        temp.put(Lists.SetterByIndex.class, Kind.SetterByIndex);
+        temp.put(UserTypes.SetterByField.class, Kind.SetterByField);
+        return temp;
+    }
+
+    private interface ToOperation
+    {
+        Operation apply(ColumnMetadata column, Term keyOrIndex, 
FieldIdentifier field, Term value);
+    }
+
+    public enum Kind
+    {
+        SetAdder((byte) 1, (column, keyOrIndex, field, value) -> new 
Sets.Adder(column, value)),
+        ConstantAdder((byte) 2, (column, keyOrIndex, field, value) -> new 
Constants.Adder(column, value)),
+        Appender((byte) 3, (column, keyOrIndex, field, value) -> new 
Lists.Appender(column, value)),

Review Comment:
   nit: `ListAppender`?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.AccordSerializers;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+
+public class TxnReferenceOperation
+{
+    private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
+    
+    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+    {
+        Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Lists.Appender.class, Kind.Appender);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.Prepender.class, Kind.Prepender);
+        temp.put(Maps.Putter.class, Kind.Putter);
+        temp.put(Lists.Setter.class, Kind.ListSetter);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Maps.Setter.class, Kind.MapSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.Subtracter);
+        temp.put(Maps.SetterByKey.class, Kind.SetterByKey);
+        temp.put(Lists.SetterByIndex.class, Kind.SetterByIndex);
+        temp.put(UserTypes.SetterByField.class, Kind.SetterByField);
+        return temp;
+    }
+
+    private interface ToOperation
+    {
+        Operation apply(ColumnMetadata column, Term keyOrIndex, 
FieldIdentifier field, Term value);
+    }
+
+    public enum Kind
+    {
+        SetAdder((byte) 1, (column, keyOrIndex, field, value) -> new 
Sets.Adder(column, value)),
+        ConstantAdder((byte) 2, (column, keyOrIndex, field, value) -> new 
Constants.Adder(column, value)),
+        Appender((byte) 3, (column, keyOrIndex, field, value) -> new 
Lists.Appender(column, value)),
+        SetDiscarder((byte) 4, (column, keyOrIndex, field, value) -> new 
Sets.Discarder(column, value)),
+        ListDiscarder((byte) 5, (column, keyOrIndex, field, value) -> new 
Lists.Discarder(column, value)),
+        Prepender((byte) 6, (column, keyOrIndex, field, value) -> new 
Lists.Prepender(column, value)),
+        Putter((byte) 7, (column, keyOrIndex, field, value) -> new 
Maps.Putter(column, value)),
+        ListSetter((byte) 8, (column, keyOrIndex, field, value) -> new 
Lists.Setter(column, value)),
+        SetSetter((byte) 9, (column, keyOrIndex, field, value) -> new 
Sets.Setter(column, value)),
+        MapSetter((byte) 10, (column, keyOrIndex, field, value) -> new 
Maps.Setter(column, value)),
+        UserTypeSetter((byte) 11, (column, keyOrIndex, field, value) -> new 
UserTypes.Setter(column, value)),
+        ConstantSetter((byte) 12, (column, keyOrIndex, field, value) -> new 
Constants.Setter(column, value)),
+        Subtracter((byte) 13, (column, keyOrIndex, field, value) -> new 
Constants.Substracter(column, value)),

Review Comment:
   nit: ConstantSubtracter?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java:
##########
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.conditions.ColumnCondition;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static 
org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static 
org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
+
+public abstract class TxnCondition
+{
+    private interface ConditionSerializer<T extends TxnCondition>
+    {
+        void serialize(T condition, DataOutputPlus out, int version) throws 
IOException;
+        T deserialize(DataInputPlus in, int version, Kind kind) throws 
IOException;
+        long serializedSize(T condition, int version);
+    }
+
+    public enum Kind
+    {
+        NONE("n/a", null),
+        AND("AND", null),
+        OR("OR", null),
+        IS_NOT_NULL("IS NOT NULL", null),
+        IS_NULL("IS NULL", null),
+        EQUAL("=", Operator.EQ),
+        NOT_EQUAL("!=", Operator.NEQ),
+        GREATER_THAN(">", Operator.GT),
+        GREATER_THAN_OR_EQUAL(">=", Operator.GTE),
+        LESS_THAN("<", Operator.LT),
+        LESS_THAN_OR_EQUAL("<=", Operator.LTE);
+
+        private final String symbol;
+        private final Operator operator;
+
+        Kind(String symbol, Operator operator)
+        {
+            this.symbol = symbol;
+            this.operator = operator;
+        }
+
+        @SuppressWarnings("rawtypes")
+        private ConditionSerializer serializer()
+        {
+            switch (this)
+            {
+                case IS_NOT_NULL:
+                case IS_NULL:
+                    return Exists.serializer;
+                case EQUAL:
+                case NOT_EQUAL:
+                case LESS_THAN:
+                case LESS_THAN_OR_EQUAL:
+                case GREATER_THAN:
+                case GREATER_THAN_OR_EQUAL:
+                    return Value.serializer;
+                case AND:
+                case OR:
+                    return BooleanGroup.serializer;
+                case NONE:
+                    return None.serializer;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+    }
+
+    protected final Kind kind;
+
+    public TxnCondition(Kind kind)
+    {
+        this.kind = kind;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TxnCondition condition = (TxnCondition) o;
+        return kind == condition.kind;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(kind);
+    }
+
+    public Kind kind()
+    {
+        return kind;
+    }
+
+    public abstract boolean applies(TxnData data);
+
+    private static class None extends TxnCondition
+    {
+        private static final None instance = new None();
+
+        private None()
+        {
+            super(Kind.NONE);
+        }
+
+        @Override
+        public String toString()
+        {
+            return kind.toString();
+        }
+
+        @Override
+        public boolean applies(TxnData data)
+        {
+            return true;
+        }
+
+        private static final ConditionSerializer<None> serializer = new 
ConditionSerializer<None>()
+        {
+            @Override
+            public void serialize(None condition, DataOutputPlus out, int 
version) {}
+            @Override
+            public None deserialize(DataInputPlus in, int version, Kind kind) 
{ return instance; }
+            @Override
+            public long serializedSize(None condition, int version) { return 
0; }
+        };
+    }
+
+    public static TxnCondition none()
+    {
+        return None.instance;
+    }
+
+    public static class Exists extends TxnCondition
+    {
+        private static final Set<Kind> KINDS = 
ImmutableSet.of(Kind.IS_NOT_NULL, Kind.IS_NULL);
+
+        public final TxnReference reference;
+
+        public Exists(TxnReference reference, Kind kind)
+        {
+            super(kind);
+            Preconditions.checkArgument(KINDS.contains(kind));

Review Comment:
   useful error plz



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Txn;
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.cql3.transactions.RowDataReference;
+import org.apache.cassandra.cql3.transactions.ConditionStatement;
+import org.apache.cassandra.cql3.transactions.ReferenceOperation;
+import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LazyToString;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public class TransactionStatement implements CQLStatement
+{
+    public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PRIMARY_KEY_LET_MESSAGE = "SELECT in 
LET assignment without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "Normal 
SELECT without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions.";
+    public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps.";
+    public static final String EMPTY_TRANSACTION_MESSAGE = "Transaction 
contains no reads or writes";
+    public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
+
+    static class NamedSelect
+    {
+        final TxnDataName name;
+        final SelectStatement select;
+
+        public NamedSelect(TxnDataName name, SelectStatement select)
+        {
+            this.name = name;
+            this.select = select;
+        }
+    }
+
+    private final List<NamedSelect> assignments;
+    private final NamedSelect returningSelect;
+    private final List<RowDataReference> returningReferences;
+    private final List<ModificationStatement> updates;
+    private final List<ConditionStatement> conditions;
+
+    private final VariableSpecifications bindVariables;
+
+    public TransactionStatement(List<NamedSelect> assignments,
+                                NamedSelect returningSelect,
+                                List<RowDataReference> returningReferences,
+                                List<ModificationStatement> updates,
+                                List<ConditionStatement> conditions,
+                                VariableSpecifications bindVariables)
+    {
+        this.assignments = assignments;
+        this.returningSelect = returningSelect;
+        this.returningReferences = returningReferences;
+        this.updates = updates;
+        this.conditions = conditions;
+        this.bindVariables = bindVariables;
+    }
+
+    @Override
+    public List<ColumnSpecification> getBindVariables()
+    {
+        return bindVariables.getBindVariables();
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        // Assess read permissions for all data from both explicit LET 
statements and generated reads.
+        for (NamedSelect let : assignments)
+            let.select.authorize(state);
+
+        if (returningSelect != null)
+            returningSelect.select.authorize(state);
+
+        for (ModificationStatement update : updates)
+            update.authorize(state);
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        for (ModificationStatement statement : updates)
+            statement.validate(state);
+    }
+
+    @VisibleForTesting
+    public List<RowDataReference> getReturningReferences()
+    {
+        return returningReferences;
+    }
+
+    TxnNamedRead createNamedRead(NamedSelect namedSelect, QueryOptions options)
+    {
+        SelectStatement select = namedSelect.select;
+        ReadQuery readQuery = select.getQuery(options, 0);
+
+        // We reject reads from both LET and SELECT that do not specify a 
single row.
+        @SuppressWarnings("unchecked") 
+        SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+        return new TxnNamedRead(namedSelect.name, 
Iterables.getOnlyElement(selectQuery.queries));
+    }
+
+    private List<TxnNamedRead> createNamedReads(QueryOptions options, 
Consumer<Key> keyConsumer)
+    {
+        List<TxnNamedRead> reads = new ArrayList<>(assignments.size() + 1);
+
+        for (NamedSelect select : assignments)
+        {
+            TxnNamedRead read = createNamedRead(select, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        if (returningSelect != null)
+        {
+            TxnNamedRead read = createNamedRead(returningSelect, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        for (NamedSelect select : autoReads.values())
+            // don't need keyConsumer as the keys are known to exist due to 
Modification
+            reads.add(createNamedRead(select, options));
+
+        return reads;
+    }
+
+    TxnCondition createCondition(QueryOptions options)
+    {
+        if (conditions.isEmpty())
+            return TxnCondition.none();
+        if (conditions.size() == 1)
+            return conditions.get(0).createCondition(options);
+
+        List<TxnCondition> result = new ArrayList<>(conditions.size());
+        for (ConditionStatement condition : conditions)
+            result.add(condition.createCondition(options));
+
+        // TODO: OR support
+        return new TxnCondition.BooleanGroup(TxnCondition.Kind.AND, result);

Review Comment:
   parser drops the `AND` and stores everything into an array, so this code 
matches parser...
   
   We spoke and `OR` and `NOT` are not part of v1, so this is fine for now.



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Txn;
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.cql3.transactions.RowDataReference;
+import org.apache.cassandra.cql3.transactions.ConditionStatement;
+import org.apache.cassandra.cql3.transactions.ReferenceOperation;
+import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LazyToString;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public class TransactionStatement implements CQLStatement
+{
+    public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PRIMARY_KEY_LET_MESSAGE = "SELECT in 
LET assignment without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "Normal 
SELECT without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions.";
+    public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps.";
+    public static final String EMPTY_TRANSACTION_MESSAGE = "Transaction 
contains no reads or writes";
+    public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
+
+    static class NamedSelect
+    {
+        final TxnDataName name;
+        final SelectStatement select;
+
+        public NamedSelect(TxnDataName name, SelectStatement select)
+        {
+            this.name = name;
+            this.select = select;
+        }
+    }
+
+    private final List<NamedSelect> assignments;
+    private final NamedSelect returningSelect;
+    private final List<RowDataReference> returningReferences;
+    private final List<ModificationStatement> updates;
+    private final List<ConditionStatement> conditions;
+
+    private final VariableSpecifications bindVariables;
+
+    public TransactionStatement(List<NamedSelect> assignments,
+                                NamedSelect returningSelect,
+                                List<RowDataReference> returningReferences,
+                                List<ModificationStatement> updates,
+                                List<ConditionStatement> conditions,
+                                VariableSpecifications bindVariables)
+    {
+        this.assignments = assignments;
+        this.returningSelect = returningSelect;
+        this.returningReferences = returningReferences;
+        this.updates = updates;
+        this.conditions = conditions;
+        this.bindVariables = bindVariables;
+    }
+
+    @Override
+    public List<ColumnSpecification> getBindVariables()
+    {
+        return bindVariables.getBindVariables();
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        // Assess read permissions for all data from both explicit LET 
statements and generated reads.
+        for (NamedSelect let : assignments)
+            let.select.authorize(state);
+
+        if (returningSelect != null)
+            returningSelect.select.authorize(state);
+
+        for (ModificationStatement update : updates)
+            update.authorize(state);
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        for (ModificationStatement statement : updates)
+            statement.validate(state);
+    }
+
+    @VisibleForTesting
+    public List<RowDataReference> getReturningReferences()
+    {
+        return returningReferences;
+    }
+
+    TxnNamedRead createNamedRead(NamedSelect namedSelect, QueryOptions options)
+    {
+        SelectStatement select = namedSelect.select;
+        ReadQuery readQuery = select.getQuery(options, 0);
+
+        // We reject reads from both LET and SELECT that do not specify a 
single row.
+        @SuppressWarnings("unchecked") 
+        SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+        return new TxnNamedRead(namedSelect.name, 
Iterables.getOnlyElement(selectQuery.queries));

Review Comment:
   added
   
   ```
   if (selectQuery.queries.size() != 1)
               throw new IllegalArgumentException("When running within a 
transaction, select statements may only select a single partition; found " + 
selectQuery.queries.size() + " partitions");
   ```
   
   I still call `Iterables.getOnlyElement` but since we already validated 
`size=1` that should be fine



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Txn;
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.cql3.transactions.RowDataReference;
+import org.apache.cassandra.cql3.transactions.ConditionStatement;
+import org.apache.cassandra.cql3.transactions.ReferenceOperation;
+import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LazyToString;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public class TransactionStatement implements CQLStatement
+{
+    public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PRIMARY_KEY_LET_MESSAGE = "SELECT in 
LET assignment without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "Normal 
SELECT without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions.";
+    public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps.";
+    public static final String EMPTY_TRANSACTION_MESSAGE = "Transaction 
contains no reads or writes";
+    public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
+
+    static class NamedSelect
+    {
+        final TxnDataName name;
+        final SelectStatement select;
+
+        public NamedSelect(TxnDataName name, SelectStatement select)
+        {
+            this.name = name;
+            this.select = select;
+        }
+    }
+
+    private final List<NamedSelect> assignments;
+    private final NamedSelect returningSelect;
+    private final List<RowDataReference> returningReferences;
+    private final List<ModificationStatement> updates;
+    private final List<ConditionStatement> conditions;
+
+    private final VariableSpecifications bindVariables;
+
+    public TransactionStatement(List<NamedSelect> assignments,
+                                NamedSelect returningSelect,
+                                List<RowDataReference> returningReferences,
+                                List<ModificationStatement> updates,
+                                List<ConditionStatement> conditions,
+                                VariableSpecifications bindVariables)
+    {
+        this.assignments = assignments;
+        this.returningSelect = returningSelect;
+        this.returningReferences = returningReferences;
+        this.updates = updates;
+        this.conditions = conditions;
+        this.bindVariables = bindVariables;
+    }
+
+    @Override
+    public List<ColumnSpecification> getBindVariables()
+    {
+        return bindVariables.getBindVariables();
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        // Assess read permissions for all data from both explicit LET 
statements and generated reads.
+        for (NamedSelect let : assignments)
+            let.select.authorize(state);
+
+        if (returningSelect != null)
+            returningSelect.select.authorize(state);
+
+        for (ModificationStatement update : updates)
+            update.authorize(state);
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        for (ModificationStatement statement : updates)
+            statement.validate(state);
+    }
+
+    @VisibleForTesting
+    public List<RowDataReference> getReturningReferences()
+    {
+        return returningReferences;
+    }
+
+    TxnNamedRead createNamedRead(NamedSelect namedSelect, QueryOptions options)
+    {
+        SelectStatement select = namedSelect.select;
+        ReadQuery readQuery = select.getQuery(options, 0);
+
+        // We reject reads from both LET and SELECT that do not specify a 
single row.
+        @SuppressWarnings("unchecked") 
+        SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+        return new TxnNamedRead(namedSelect.name, 
Iterables.getOnlyElement(selectQuery.queries));
+    }
+
+    private List<TxnNamedRead> createNamedReads(QueryOptions options, 
Consumer<Key> keyConsumer)
+    {
+        List<TxnNamedRead> reads = new ArrayList<>(assignments.size() + 1);
+
+        for (NamedSelect select : assignments)
+        {
+            TxnNamedRead read = createNamedRead(select, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        if (returningSelect != null)
+        {
+            TxnNamedRead read = createNamedRead(returningSelect, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        for (NamedSelect select : autoReads.values())
+            // don't need keyConsumer as the keys are known to exist due to 
Modification
+            reads.add(createNamedRead(select, options));
+
+        return reads;
+    }
+
+    TxnCondition createCondition(QueryOptions options)
+    {
+        if (conditions.isEmpty())
+            return TxnCondition.none();
+        if (conditions.size() == 1)
+            return conditions.get(0).createCondition(options);
+
+        List<TxnCondition> result = new ArrayList<>(conditions.size());
+        for (ConditionStatement condition : conditions)
+            result.add(condition.createCondition(options));
+
+        // TODO: OR support
+        return new TxnCondition.BooleanGroup(TxnCondition.Kind.AND, result);
+    }
+
+    private final Map<TxnDataName, NamedSelect> autoReads = new HashMap<>();
+
+    List<TxnWrite.Fragment> createWriteFragments(ClientState state, 
QueryOptions options, Consumer<Key> keyConsumer)
+    {
+        List<TxnWrite.Fragment> fragments = new ArrayList<>(updates.size());
+        int idx = 0;
+        for (ModificationStatement modification : updates)
+        {
+            TxnWrite.Fragment fragment = modification.getTxnWriteFragment(idx, 
state, options);
+            keyConsumer.accept(fragment.key);
+            fragments.add(fragment);
+
+            if 
(modification.allReferenceOperations().stream().anyMatch(ReferenceOperation::requiresRead))
+            {
+                // Reads are not merged by partition here due to potentially 
differing columns retrieved, etc.
+                TxnDataName partitionName = 
TxnDataName.partitionRead(modification.metadata(), fragment.key.partitionKey(), 
idx);
+                if (!autoReads.containsKey(partitionName))
+                    autoReads.put(partitionName, new 
NamedSelect(partitionName, modification.createSelectForTxn()));
+            }
+
+            idx++;
+        }
+        return fragments;
+    }
+
+    TxnUpdate createUpdate(ClientState state, QueryOptions options, 
Consumer<Key> keyConsumer)
+    {
+        return new TxnUpdate(createWriteFragments(state, options, 
keyConsumer), createCondition(options));
+    }
+
+    Keys toKeys(SortedSet<Key> keySet)
+    {
+        return new Keys(keySet);
+    }
+
+    @VisibleForTesting
+    public Txn createTxn(ClientState state, QueryOptions options)
+    {
+        SortedSet<Key> keySet = new TreeSet<>();
+
+        if (updates.isEmpty())
+        {
+            // TODO: Test case around this...
+            Preconditions.checkState(conditions.isEmpty(), "No condition 
should exist without updates present");
+            List<TxnNamedRead> reads = createNamedReads(options, keySet::add);
+            Keys txnKeys = toKeys(keySet);
+            TxnRead read = new TxnRead(reads, txnKeys);
+            return new Txn.InMemory(txnKeys, read, TxnQuery.ALL);
+        }
+        else
+        {
+            TxnUpdate update = createUpdate(state, options, keySet::add);
+            List<TxnNamedRead> reads = createNamedReads(options, keySet::add);
+            Keys txnKeys = toKeys(keySet);
+            TxnRead read = new TxnRead(reads, txnKeys);
+            return new Txn.InMemory(txnKeys, read, TxnQuery.ALL, update);
+        }
+    }
+
+    @Override
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime)
+    {
+        TxnData data = 
AccordService.instance().coordinate(createTxn(state.getClientState(), options));
+        
+        if (returningSelect != null)
+        {
+            FilteredPartition partition = data.get(TxnDataName.returning());
+            Selection.Selectors selectors = 
returningSelect.select.getSelection().newSelectors(options);
+            ResultSetBuilder result = new 
ResultSetBuilder(returningSelect.select.getResultMetadata(), selectors, null);
+            returningSelect.select.processPartition(partition.rowIterator(), 
options, result, FBUtilities.nowInSeconds());
+            return new ResultMessage.Rows(result.build());
+        }
+        
+        if (returningReferences != null)
+        {
+            List<ColumnSpecification> names = new 
ArrayList<>(returningReferences.size());
+            List<ColumnMetadata> columns = new 
ArrayList<>(returningReferences.size());
+
+            for (RowDataReference reference : returningReferences)
+            {
+                ColumnMetadata forMetadata = reference.toResultMetadata();
+                names.add(forMetadata);
+                columns.add(reference.column());
+            }
+
+            ResultSetBuilder result = new ResultSetBuilder(new 
ResultSet.ResultMetadata(names), Selection.noopSelector(), null);
+            result.newRow(options.getProtocolVersion(), null, null, columns);
+
+            for (int i = 0; i < returningReferences.size(); i++)
+            {
+                RowDataReference reference = returningReferences.get(i);
+                TxnReference txnReference = reference.toTxnReference(options);
+                ByteBuffer buffer = txnReference.toByteBuffer(data, 
names.get(i).type);
+                result.add(buffer);
+            }
+
+            return new ResultMessage.Rows(result.build());
+        }
+
+        // In the case of a write-only transaction, just return and empty 
result.
+        // TODO: This could be modified to return an indication of whether a 
condition (if present) succeeds.
+        return new ResultMessage.Void();
+    }
+
+    @Override
+    public ResultMessage executeLocally(QueryState state, QueryOptions options)
+    {
+        return execute(state, options, nanoTime());
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.TRANSACTION);
+    }
+
+    public static class Parsed extends QualifiedStatement
+    {
+        private final List<SelectStatement.RawStatement> assignments;
+        private final SelectStatement.RawStatement select;
+        private final List<RowDataReference.Raw> returning;
+        private final List<ModificationStatement.Parsed> updates;
+        private final List<ConditionStatement.Raw> conditions;
+        private final List<RowDataReference.Raw> dataReferences;
+
+        public Parsed(List<SelectStatement.RawStatement> assignments,
+                      SelectStatement.RawStatement select,
+                      List<RowDataReference.Raw> returning,
+                      List<ModificationStatement.Parsed> updates,
+                      List<ConditionStatement.Raw> conditions,
+                      List<RowDataReference.Raw> dataReferences)
+        {
+            super(null);
+            this.assignments = assignments;
+            this.select = select;
+            this.returning = returning;
+            this.updates = updates;
+            this.conditions = conditions != null ? conditions : 
Collections.emptyList();
+            this.dataReferences = dataReferences;
+        }
+
+        @Override
+        public void setKeyspace(ClientState state)
+        {
+            assignments.forEach(select -> select.setKeyspace(state));
+            updates.forEach(update -> update.setKeyspace(state));
+        }
+
+        @Override
+        public CQLStatement prepare(ClientState state)
+        {
+            checkFalse(updates.isEmpty() && returning == null && select == 
null, EMPTY_TRANSACTION_MESSAGE);
+
+            if (select != null || returning != null)
+                checkTrue(select != null ^ returning != null, "Cannot specify 
both a full SELECT and a SELECT w/ LET references.");
+
+            List<NamedSelect> preparedAssignments = new 
ArrayList<>(assignments.size());
+            Map<TxnDataName, RowDataReference.ReferenceSource> refSources = 
new HashMap<>();
+            Set<TxnDataName> selectNames = new HashSet<>();
+
+            for (SelectStatement.RawStatement select : assignments)
+            {
+                TxnDataName name = TxnDataName.user(select.parameters.refName);
+                checkNotNull(name, "Assignments must be named");
+                checkTrue(selectNames.add(name), DUPLICATE_TUPLE_NAME_MESSAGE, 
name.name());
+
+                SelectStatement prepared = select.prepare(bindVariables);
+                checkAtMostOneRowSpecified(prepared, 
INCOMPLETE_PRIMARY_KEY_LET_MESSAGE, LazyToString.lazy(() -> 
prepared.asCQL(QueryOptions.DEFAULT, state)));
+
+                NamedSelect namedSelect = new NamedSelect(name, prepared);
+                preparedAssignments.add(namedSelect);
+                refSources.put(name, new SelectReferenceSource(prepared));
+            }
+
+            if (dataReferences != null)
+                for (RowDataReference.Raw reference : dataReferences)
+                    reference.resolveReference(refSources);
+
+            NamedSelect returningSelect = null;
+            if (select != null)
+            {
+                SelectStatement prepared = select.prepare(bindVariables);
+                // TODO: Accord saves the result of this read, so limit to a 
single row until that is no longer true.
+                checkAtMostOneRowSpecified(prepared, 
INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, LazyToString.lazy(() -> 
prepared.asCQL(QueryOptions.DEFAULT, state)));
+                returningSelect = new NamedSelect(TxnDataName.returning(), 
prepared);
+            }
+
+            List<RowDataReference> returningReferences = null;
+
+            if (returning != null)
+            {
+                // TODO: Eliminate/modify this check if we allow full tuple 
selections.
+                returningReferences = returning.stream().peek(raw -> 
checkTrue(raw.column() != null, SELECT_REFS_NEED_COLUMN_MESSAGE))
+                                                        
.map(RowDataReference.Raw::prepareAsReceiver)
+                                                        
.collect(Collectors.toList());
+            }
+
+            List<ModificationStatement> preparedUpdates = new 
ArrayList<>(updates.size());
+            
+            // check for any read-before-write updates
+            for (int i = 0; i < updates.size(); i++)
+            {
+                ModificationStatement.Parsed parsed = updates.get(i);
+
+                ModificationStatement prepared = parsed.prepare(bindVariables);
+                checkFalse(prepared.hasConditions(), 
NO_CONDITIONS_IN_UPDATES_MESSAGE);
+                checkFalse(prepared.isTimestampSet(), 
NO_TIMESTAMPS_IN_UPDATES_MESSAGE);
+
+                preparedUpdates.add(prepared);
+            }
+
+            List<ConditionStatement> preparedConditions = new 
ArrayList<>(conditions.size());
+            for (ConditionStatement.Raw condition : conditions)
+                // TODO: Is this synthetic ks name dangerous?
+                preparedConditions.add(condition.prepare("[txn]", 
bindVariables));

Review Comment:
   for now im cool with us ignore... 



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Txn;
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.cql3.transactions.RowDataReference;
+import org.apache.cassandra.cql3.transactions.ConditionStatement;
+import org.apache.cassandra.cql3.transactions.ReferenceOperation;
+import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LazyToString;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public class TransactionStatement implements CQLStatement
+{
+    public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PRIMARY_KEY_LET_MESSAGE = "SELECT in 
LET assignment without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "Normal 
SELECT without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions.";
+    public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps.";
+    public static final String EMPTY_TRANSACTION_MESSAGE = "Transaction 
contains no reads or writes";
+    public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
+
+    static class NamedSelect
+    {
+        final TxnDataName name;
+        final SelectStatement select;
+
+        public NamedSelect(TxnDataName name, SelectStatement select)
+        {
+            this.name = name;
+            this.select = select;
+        }
+    }
+
+    private final List<NamedSelect> assignments;
+    private final NamedSelect returningSelect;
+    private final List<RowDataReference> returningReferences;
+    private final List<ModificationStatement> updates;
+    private final List<ConditionStatement> conditions;
+
+    private final VariableSpecifications bindVariables;
+
+    public TransactionStatement(List<NamedSelect> assignments,
+                                NamedSelect returningSelect,
+                                List<RowDataReference> returningReferences,
+                                List<ModificationStatement> updates,
+                                List<ConditionStatement> conditions,
+                                VariableSpecifications bindVariables)
+    {
+        this.assignments = assignments;
+        this.returningSelect = returningSelect;
+        this.returningReferences = returningReferences;
+        this.updates = updates;
+        this.conditions = conditions;
+        this.bindVariables = bindVariables;
+    }
+
+    @Override
+    public List<ColumnSpecification> getBindVariables()
+    {
+        return bindVariables.getBindVariables();
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        // Assess read permissions for all data from both explicit LET 
statements and generated reads.
+        for (NamedSelect let : assignments)
+            let.select.authorize(state);
+
+        if (returningSelect != null)
+            returningSelect.select.authorize(state);
+
+        for (ModificationStatement update : updates)
+            update.authorize(state);
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        for (ModificationStatement statement : updates)
+            statement.validate(state);
+    }
+
+    @VisibleForTesting
+    public List<RowDataReference> getReturningReferences()
+    {
+        return returningReferences;
+    }
+
+    TxnNamedRead createNamedRead(NamedSelect namedSelect, QueryOptions options)
+    {
+        SelectStatement select = namedSelect.select;
+        ReadQuery readQuery = select.getQuery(options, 0);
+
+        // We reject reads from both LET and SELECT that do not specify a 
single row.
+        @SuppressWarnings("unchecked") 
+        SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+        return new TxnNamedRead(namedSelect.name, 
Iterables.getOnlyElement(selectQuery.queries));
+    }
+
+    private List<TxnNamedRead> createNamedReads(QueryOptions options, 
Consumer<Key> keyConsumer)
+    {
+        List<TxnNamedRead> reads = new ArrayList<>(assignments.size() + 1);
+
+        for (NamedSelect select : assignments)
+        {
+            TxnNamedRead read = createNamedRead(select, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        if (returningSelect != null)
+        {
+            TxnNamedRead read = createNamedRead(returningSelect, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        for (NamedSelect select : autoReads.values())
+            // don't need keyConsumer as the keys are known to exist due to 
Modification
+            reads.add(createNamedRead(select, options));
+
+        return reads;
+    }
+
+    TxnCondition createCondition(QueryOptions options)
+    {
+        if (conditions.isEmpty())
+            return TxnCondition.none();
+        if (conditions.size() == 1)
+            return conditions.get(0).createCondition(options);
+
+        List<TxnCondition> result = new ArrayList<>(conditions.size());
+        for (ConditionStatement condition : conditions)
+            result.add(condition.createCondition(options));
+
+        // TODO: OR support
+        return new TxnCondition.BooleanGroup(TxnCondition.Kind.AND, result);
+    }
+
+    private final Map<TxnDataName, NamedSelect> autoReads = new HashMap<>();
+
+    List<TxnWrite.Fragment> createWriteFragments(ClientState state, 
QueryOptions options, Consumer<Key> keyConsumer)
+    {
+        List<TxnWrite.Fragment> fragments = new ArrayList<>(updates.size());
+        int idx = 0;
+        for (ModificationStatement modification : updates)
+        {
+            TxnWrite.Fragment fragment = modification.getTxnWriteFragment(idx, 
state, options);
+            keyConsumer.accept(fragment.key);
+            fragments.add(fragment);
+
+            if 
(modification.allReferenceOperations().stream().anyMatch(ReferenceOperation::requiresRead))
+            {
+                // Reads are not merged by partition here due to potentially 
differing columns retrieved, etc.
+                TxnDataName partitionName = 
TxnDataName.partitionRead(modification.metadata(), fragment.key.partitionKey(), 
idx);
+                if (!autoReads.containsKey(partitionName))
+                    autoReads.put(partitionName, new 
NamedSelect(partitionName, modification.createSelectForTxn()));
+            }
+
+            idx++;
+        }
+        return fragments;
+    }
+
+    TxnUpdate createUpdate(ClientState state, QueryOptions options, 
Consumer<Key> keyConsumer)
+    {
+        return new TxnUpdate(createWriteFragments(state, options, 
keyConsumer), createCondition(options));
+    }
+
+    Keys toKeys(SortedSet<Key> keySet)
+    {
+        return new Keys(keySet);
+    }
+
+    @VisibleForTesting
+    public Txn createTxn(ClientState state, QueryOptions options)
+    {
+        SortedSet<Key> keySet = new TreeSet<>();
+
+        if (updates.isEmpty())
+        {
+            // TODO: Test case around this...
+            Preconditions.checkState(conditions.isEmpty(), "No condition 
should exist without updates present");
+            List<TxnNamedRead> reads = createNamedReads(options, keySet::add);
+            Keys txnKeys = toKeys(keySet);
+            TxnRead read = new TxnRead(reads, txnKeys);
+            return new Txn.InMemory(txnKeys, read, TxnQuery.ALL);
+        }
+        else
+        {
+            TxnUpdate update = createUpdate(state, options, keySet::add);
+            List<TxnNamedRead> reads = createNamedReads(options, keySet::add);
+            Keys txnKeys = toKeys(keySet);
+            TxnRead read = new TxnRead(reads, txnKeys);
+            return new Txn.InMemory(txnKeys, read, TxnQuery.ALL, update);
+        }
+    }
+
+    @Override
+    public ResultMessage execute(QueryState state, QueryOptions options, long 
queryStartNanoTime)
+    {
+        TxnData data = 
AccordService.instance().coordinate(createTxn(state.getClientState(), options));
+        
+        if (returningSelect != null)
+        {
+            FilteredPartition partition = data.get(TxnDataName.returning());
+            Selection.Selectors selectors = 
returningSelect.select.getSelection().newSelectors(options);
+            ResultSetBuilder result = new 
ResultSetBuilder(returningSelect.select.getResultMetadata(), selectors, null);
+            returningSelect.select.processPartition(partition.rowIterator(), 
options, result, FBUtilities.nowInSeconds());
+            return new ResultMessage.Rows(result.build());
+        }
+        
+        if (returningReferences != null)
+        {
+            List<ColumnSpecification> names = new 
ArrayList<>(returningReferences.size());
+            List<ColumnMetadata> columns = new 
ArrayList<>(returningReferences.size());
+
+            for (RowDataReference reference : returningReferences)
+            {
+                ColumnMetadata forMetadata = reference.toResultMetadata();
+                names.add(forMetadata);
+                columns.add(reference.column());
+            }
+
+            ResultSetBuilder result = new ResultSetBuilder(new 
ResultSet.ResultMetadata(names), Selection.noopSelector(), null);
+            result.newRow(options.getProtocolVersion(), null, null, columns);
+
+            for (int i = 0; i < returningReferences.size(); i++)
+            {
+                RowDataReference reference = returningReferences.get(i);
+                TxnReference txnReference = reference.toTxnReference(options);
+                ByteBuffer buffer = txnReference.toByteBuffer(data, 
names.get(i).type);
+                result.add(buffer);
+            }
+
+            return new ResultMessage.Rows(result.build());
+        }
+
+        // In the case of a write-only transaction, just return and empty 
result.
+        // TODO: This could be modified to return an indication of whether a 
condition (if present) succeeds.
+        return new ResultMessage.Void();
+    }
+
+    @Override
+    public ResultMessage executeLocally(QueryState state, QueryOptions options)
+    {
+        return execute(state, options, nanoTime());
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.TRANSACTION);
+    }
+
+    public static class Parsed extends QualifiedStatement
+    {
+        private final List<SelectStatement.RawStatement> assignments;
+        private final SelectStatement.RawStatement select;
+        private final List<RowDataReference.Raw> returning;
+        private final List<ModificationStatement.Parsed> updates;
+        private final List<ConditionStatement.Raw> conditions;
+        private final List<RowDataReference.Raw> dataReferences;
+
+        public Parsed(List<SelectStatement.RawStatement> assignments,
+                      SelectStatement.RawStatement select,
+                      List<RowDataReference.Raw> returning,
+                      List<ModificationStatement.Parsed> updates,
+                      List<ConditionStatement.Raw> conditions,
+                      List<RowDataReference.Raw> dataReferences)
+        {
+            super(null);
+            this.assignments = assignments;
+            this.select = select;
+            this.returning = returning;
+            this.updates = updates;
+            this.conditions = conditions != null ? conditions : 
Collections.emptyList();
+            this.dataReferences = dataReferences;
+        }
+
+        @Override
+        public void setKeyspace(ClientState state)
+        {
+            assignments.forEach(select -> select.setKeyspace(state));
+            updates.forEach(update -> update.setKeyspace(state));
+        }
+
+        @Override
+        public CQLStatement prepare(ClientState state)
+        {
+            checkFalse(updates.isEmpty() && returning == null && select == 
null, EMPTY_TRANSACTION_MESSAGE);
+
+            if (select != null || returning != null)
+                checkTrue(select != null ^ returning != null, "Cannot specify 
both a full SELECT and a SELECT w/ LET references.");
+
+            List<NamedSelect> preparedAssignments = new 
ArrayList<>(assignments.size());
+            Map<TxnDataName, RowDataReference.ReferenceSource> refSources = 
new HashMap<>();
+            Set<TxnDataName> selectNames = new HashSet<>();
+
+            for (SelectStatement.RawStatement select : assignments)
+            {
+                TxnDataName name = TxnDataName.user(select.parameters.refName);
+                checkNotNull(name, "Assignments must be named");
+                checkTrue(selectNames.add(name), DUPLICATE_TUPLE_NAME_MESSAGE, 
name.name());
+
+                SelectStatement prepared = select.prepare(bindVariables);
+                checkAtMostOneRowSpecified(prepared, 
INCOMPLETE_PRIMARY_KEY_LET_MESSAGE, LazyToString.lazy(() -> 
prepared.asCQL(QueryOptions.DEFAULT, state)));
+
+                NamedSelect namedSelect = new NamedSelect(name, prepared);
+                preparedAssignments.add(namedSelect);
+                refSources.put(name, new SelectReferenceSource(prepared));
+            }
+
+            if (dataReferences != null)
+                for (RowDataReference.Raw reference : dataReferences)
+                    reference.resolveReference(refSources);
+
+            NamedSelect returningSelect = null;
+            if (select != null)
+            {
+                SelectStatement prepared = select.prepare(bindVariables);
+                // TODO: Accord saves the result of this read, so limit to a 
single row until that is no longer true.
+                checkAtMostOneRowSpecified(prepared, 
INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, LazyToString.lazy(() -> 
prepared.asCQL(QueryOptions.DEFAULT, state)));
+                returningSelect = new NamedSelect(TxnDataName.returning(), 
prepared);
+            }
+
+            List<RowDataReference> returningReferences = null;
+
+            if (returning != null)
+            {
+                // TODO: Eliminate/modify this check if we allow full tuple 
selections.
+                returningReferences = returning.stream().peek(raw -> 
checkTrue(raw.column() != null, SELECT_REFS_NEED_COLUMN_MESSAGE))
+                                                        
.map(RowDataReference.Raw::prepareAsReceiver)
+                                                        
.collect(Collectors.toList());
+            }
+
+            List<ModificationStatement> preparedUpdates = new 
ArrayList<>(updates.size());
+            
+            // check for any read-before-write updates
+            for (int i = 0; i < updates.size(); i++)
+            {
+                ModificationStatement.Parsed parsed = updates.get(i);
+
+                ModificationStatement prepared = parsed.prepare(bindVariables);
+                checkFalse(prepared.hasConditions(), 
NO_CONDITIONS_IN_UPDATES_MESSAGE);
+                checkFalse(prepared.isTimestampSet(), 
NO_TIMESTAMPS_IN_UPDATES_MESSAGE);
+
+                preparedUpdates.add(prepared);
+            }
+
+            List<ConditionStatement> preparedConditions = new 
ArrayList<>(conditions.size());
+            for (ConditionStatement.Raw condition : conditions)
+                // TODO: Is this synthetic ks name dangerous?
+                preparedConditions.add(condition.prepare("[txn]", 
bindVariables));

Review Comment:
   that's where we would actually get the keyspace name from, correct?
   
   it gets tricky... 
   
   ```
   IF fancy(a)
   ```
   
   that should work IMO IFF `fancy` is in the same keyspace as `a`, so parser 
wouldn't know this, `prepare` would need to handle/solve



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.AccordSerializers;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+
+public class TxnReferenceOperation
+{
+    private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
+    
+    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+    {
+        Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Lists.Appender.class, Kind.Appender);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.Prepender.class, Kind.Prepender);
+        temp.put(Maps.Putter.class, Kind.Putter);
+        temp.put(Lists.Setter.class, Kind.ListSetter);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Maps.Setter.class, Kind.MapSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.Subtracter);
+        temp.put(Maps.SetterByKey.class, Kind.SetterByKey);
+        temp.put(Lists.SetterByIndex.class, Kind.SetterByIndex);
+        temp.put(UserTypes.SetterByField.class, Kind.SetterByField);
+        return temp;
+    }
+
+    private interface ToOperation
+    {
+        Operation apply(ColumnMetadata column, Term keyOrIndex, 
FieldIdentifier field, Term value);
+    }
+
+    public enum Kind
+    {
+        SetAdder((byte) 1, (column, keyOrIndex, field, value) -> new 
Sets.Adder(column, value)),
+        ConstantAdder((byte) 2, (column, keyOrIndex, field, value) -> new 
Constants.Adder(column, value)),
+        Appender((byte) 3, (column, keyOrIndex, field, value) -> new 
Lists.Appender(column, value)),
+        SetDiscarder((byte) 4, (column, keyOrIndex, field, value) -> new 
Sets.Discarder(column, value)),
+        ListDiscarder((byte) 5, (column, keyOrIndex, field, value) -> new 
Lists.Discarder(column, value)),
+        Prepender((byte) 6, (column, keyOrIndex, field, value) -> new 
Lists.Prepender(column, value)),
+        Putter((byte) 7, (column, keyOrIndex, field, value) -> new 
Maps.Putter(column, value)),
+        ListSetter((byte) 8, (column, keyOrIndex, field, value) -> new 
Lists.Setter(column, value)),
+        SetSetter((byte) 9, (column, keyOrIndex, field, value) -> new 
Sets.Setter(column, value)),
+        MapSetter((byte) 10, (column, keyOrIndex, field, value) -> new 
Maps.Setter(column, value)),
+        UserTypeSetter((byte) 11, (column, keyOrIndex, field, value) -> new 
UserTypes.Setter(column, value)),
+        ConstantSetter((byte) 12, (column, keyOrIndex, field, value) -> new 
Constants.Setter(column, value)),
+        Subtracter((byte) 13, (column, keyOrIndex, field, value) -> new 
Constants.Substracter(column, value)),
+        SetterByKey((byte) 14, (column, keyOrIndex, field, value) -> new 
Maps.SetterByKey(column, keyOrIndex, value)),

Review Comment:
   nit: MapSetterByKey?



##########
src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.transactions;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.types.utils.Bytes;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+
+public class RowDataReference extends Term.NonTerminal
+{
+    public static final String CANNOT_FIND_TUPLE_MESSAGE = "Cannot resolve 
reference to tuple '%s'.";
+    public static final String COLUMN_NOT_IN_TUPLE_MESSAGE = "Column '%s' does 
not exist in tuple '%s'.";
+
+    private final TxnDataName selectName;
+    private final ColumnMetadata column;
+    private final Term elementPath;
+    private final CellPath fieldPath;
+    
+    public RowDataReference(TxnDataName selectName, ColumnMetadata column, 
Term elementPath, CellPath fieldPath)
+    {
+        Preconditions.checkArgument(elementPath == null || fieldPath == null, 
"Cannot specify both element and field paths");
+        
+        this.selectName = selectName;
+        this.column = column;
+        this.elementPath = elementPath;
+        this.fieldPath = fieldPath;
+    }
+
+    @Override
+    public void collectMarkerSpecification(VariableSpecifications boundNames)
+    {
+        if (elementPath != null)
+            elementPath.collectMarkerSpecification(boundNames);
+    }
+
+    @Override
+    public Terminal bind(QueryOptions options) throws InvalidRequestException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsBindMarker()
+    {
+        return elementPath != null && elementPath.containsBindMarker();
+    }
+
+    @Override
+    public void addFunctionsTo(List<Function> functions)
+    {
+        throw new UnsupportedOperationException("Functions are not currently 
supported w/ reference terms.");
+    }
+
+    public ColumnMetadata toResultMetadata()
+    {
+        ColumnIdentifier fullName = getFullyQualifiedName();
+        ColumnMetadata forMetadata = column.withNewName(fullName);
+
+        if (isElementSelection())
+        {
+            if (forMetadata.type instanceof SetType)
+                forMetadata = forMetadata.withNewType(((SetType<?>) 
forMetadata.type).nameComparator());
+            else if (forMetadata.type instanceof MapType)

Review Comment:
   no list?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.AccordSerializers;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+
+public class TxnReferenceOperation
+{
+    private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
+    
+    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+    {
+        Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Lists.Appender.class, Kind.Appender);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.Prepender.class, Kind.Prepender);
+        temp.put(Maps.Putter.class, Kind.Putter);
+        temp.put(Lists.Setter.class, Kind.ListSetter);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Maps.Setter.class, Kind.MapSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.Subtracter);
+        temp.put(Maps.SetterByKey.class, Kind.SetterByKey);
+        temp.put(Lists.SetterByIndex.class, Kind.SetterByIndex);
+        temp.put(UserTypes.SetterByField.class, Kind.SetterByField);
+        return temp;
+    }
+
+    private interface ToOperation
+    {
+        Operation apply(ColumnMetadata column, Term keyOrIndex, 
FieldIdentifier field, Term value);
+    }
+
+    public enum Kind
+    {
+        SetAdder((byte) 1, (column, keyOrIndex, field, value) -> new 
Sets.Adder(column, value)),
+        ConstantAdder((byte) 2, (column, keyOrIndex, field, value) -> new 
Constants.Adder(column, value)),
+        Appender((byte) 3, (column, keyOrIndex, field, value) -> new 
Lists.Appender(column, value)),
+        SetDiscarder((byte) 4, (column, keyOrIndex, field, value) -> new 
Sets.Discarder(column, value)),
+        ListDiscarder((byte) 5, (column, keyOrIndex, field, value) -> new 
Lists.Discarder(column, value)),
+        Prepender((byte) 6, (column, keyOrIndex, field, value) -> new 
Lists.Prepender(column, value)),

Review Comment:
   nit: ListPrepender?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.AccordSerializers;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+
+public class TxnReferenceOperation
+{
+    private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
+    
+    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+    {
+        Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Lists.Appender.class, Kind.Appender);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.Prepender.class, Kind.Prepender);
+        temp.put(Maps.Putter.class, Kind.Putter);
+        temp.put(Lists.Setter.class, Kind.ListSetter);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Maps.Setter.class, Kind.MapSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.Subtracter);
+        temp.put(Maps.SetterByKey.class, Kind.SetterByKey);
+        temp.put(Lists.SetterByIndex.class, Kind.SetterByIndex);
+        temp.put(UserTypes.SetterByField.class, Kind.SetterByField);
+        return temp;
+    }
+
+    private interface ToOperation
+    {
+        Operation apply(ColumnMetadata column, Term keyOrIndex, 
FieldIdentifier field, Term value);
+    }
+
+    public enum Kind
+    {
+        SetAdder((byte) 1, (column, keyOrIndex, field, value) -> new 
Sets.Adder(column, value)),
+        ConstantAdder((byte) 2, (column, keyOrIndex, field, value) -> new 
Constants.Adder(column, value)),
+        Appender((byte) 3, (column, keyOrIndex, field, value) -> new 
Lists.Appender(column, value)),
+        SetDiscarder((byte) 4, (column, keyOrIndex, field, value) -> new 
Sets.Discarder(column, value)),
+        ListDiscarder((byte) 5, (column, keyOrIndex, field, value) -> new 
Lists.Discarder(column, value)),
+        Prepender((byte) 6, (column, keyOrIndex, field, value) -> new 
Lists.Prepender(column, value)),
+        Putter((byte) 7, (column, keyOrIndex, field, value) -> new 
Maps.Putter(column, value)),

Review Comment:
   nit: MapPutter?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.AccordSerializers;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+
+public class TxnReferenceOperation
+{
+    private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
+    
+    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+    {
+        Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Lists.Appender.class, Kind.Appender);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.Prepender.class, Kind.Prepender);
+        temp.put(Maps.Putter.class, Kind.Putter);
+        temp.put(Lists.Setter.class, Kind.ListSetter);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Maps.Setter.class, Kind.MapSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.Subtracter);
+        temp.put(Maps.SetterByKey.class, Kind.SetterByKey);
+        temp.put(Lists.SetterByIndex.class, Kind.SetterByIndex);
+        temp.put(UserTypes.SetterByField.class, Kind.SetterByField);
+        return temp;
+    }
+
+    private interface ToOperation
+    {
+        Operation apply(ColumnMetadata column, Term keyOrIndex, 
FieldIdentifier field, Term value);
+    }
+
+    public enum Kind
+    {
+        SetAdder((byte) 1, (column, keyOrIndex, field, value) -> new 
Sets.Adder(column, value)),
+        ConstantAdder((byte) 2, (column, keyOrIndex, field, value) -> new 
Constants.Adder(column, value)),
+        Appender((byte) 3, (column, keyOrIndex, field, value) -> new 
Lists.Appender(column, value)),
+        SetDiscarder((byte) 4, (column, keyOrIndex, field, value) -> new 
Sets.Discarder(column, value)),
+        ListDiscarder((byte) 5, (column, keyOrIndex, field, value) -> new 
Lists.Discarder(column, value)),
+        Prepender((byte) 6, (column, keyOrIndex, field, value) -> new 
Lists.Prepender(column, value)),
+        Putter((byte) 7, (column, keyOrIndex, field, value) -> new 
Maps.Putter(column, value)),
+        ListSetter((byte) 8, (column, keyOrIndex, field, value) -> new 
Lists.Setter(column, value)),
+        SetSetter((byte) 9, (column, keyOrIndex, field, value) -> new 
Sets.Setter(column, value)),
+        MapSetter((byte) 10, (column, keyOrIndex, field, value) -> new 
Maps.Setter(column, value)),
+        UserTypeSetter((byte) 11, (column, keyOrIndex, field, value) -> new 
UserTypes.Setter(column, value)),
+        ConstantSetter((byte) 12, (column, keyOrIndex, field, value) -> new 
Constants.Setter(column, value)),
+        Subtracter((byte) 13, (column, keyOrIndex, field, value) -> new 
Constants.Substracter(column, value)),
+        SetterByKey((byte) 14, (column, keyOrIndex, field, value) -> new 
Maps.SetterByKey(column, keyOrIndex, value)),
+        SetterByIndex((byte) 15, (column, keyOrIndex, field, value) -> new 
Lists.SetterByIndex(column, keyOrIndex, value)),

Review Comment:
   nit: ListSetterByIndex?



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Txn;
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.cql3.transactions.RowDataReference;
+import org.apache.cassandra.cql3.transactions.ConditionStatement;
+import org.apache.cassandra.cql3.transactions.ReferenceOperation;
+import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LazyToString;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public class TransactionStatement implements CQLStatement
+{
+    public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PRIMARY_KEY_LET_MESSAGE = "SELECT in 
LET assignment without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "Normal 
SELECT without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions.";
+    public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps.";
+    public static final String EMPTY_TRANSACTION_MESSAGE = "Transaction 
contains no reads or writes";
+    public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
+
+    static class NamedSelect
+    {
+        final TxnDataName name;
+        final SelectStatement select;
+
+        public NamedSelect(TxnDataName name, SelectStatement select)
+        {
+            this.name = name;
+            this.select = select;
+        }
+    }
+
+    private final List<NamedSelect> assignments;
+    private final NamedSelect returningSelect;
+    private final List<RowDataReference> returningReferences;
+    private final List<ModificationStatement> updates;
+    private final List<ConditionStatement> conditions;
+
+    private final VariableSpecifications bindVariables;
+
+    public TransactionStatement(List<NamedSelect> assignments,
+                                NamedSelect returningSelect,
+                                List<RowDataReference> returningReferences,
+                                List<ModificationStatement> updates,
+                                List<ConditionStatement> conditions,
+                                VariableSpecifications bindVariables)
+    {
+        this.assignments = assignments;
+        this.returningSelect = returningSelect;
+        this.returningReferences = returningReferences;
+        this.updates = updates;
+        this.conditions = conditions;
+        this.bindVariables = bindVariables;
+    }
+
+    @Override
+    public List<ColumnSpecification> getBindVariables()
+    {
+        return bindVariables.getBindVariables();
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        // Assess read permissions for all data from both explicit LET 
statements and generated reads.
+        for (NamedSelect let : assignments)
+            let.select.authorize(state);
+
+        if (returningSelect != null)
+            returningSelect.select.authorize(state);
+
+        for (ModificationStatement update : updates)
+            update.authorize(state);
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        for (ModificationStatement statement : updates)
+            statement.validate(state);
+    }
+
+    @VisibleForTesting
+    public List<RowDataReference> getReturningReferences()
+    {
+        return returningReferences;
+    }
+
+    TxnNamedRead createNamedRead(NamedSelect namedSelect, QueryOptions options)
+    {
+        SelectStatement select = namedSelect.select;
+        ReadQuery readQuery = select.getQuery(options, 0);
+
+        // We reject reads from both LET and SELECT that do not specify a 
single row.
+        @SuppressWarnings("unchecked") 
+        SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+        return new TxnNamedRead(namedSelect.name, 
Iterables.getOnlyElement(selectQuery.queries));

Review Comment:
   we changed 
`org.apache.cassandra.cql3.statements.ModificationStatement#getTxnUpdate` to 
avoid calling `Iterables.getOnlyElement` for this reason... I think its cleaner 
to just check the size rather than try/catch
   
   ill put in my feedback branch



##########
src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operation;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.AccordSerializers;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
+import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+
+public class TxnReferenceOperation
+{
+    private static final Map<Class<? extends Operation>, Kind> 
operationKindMap = initOperationKindMap();
+    
+    private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
+    {
+        Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
+        temp.put(Sets.Adder.class, Kind.SetAdder);
+        temp.put(Constants.Adder.class, Kind.ConstantAdder);
+        temp.put(Lists.Appender.class, Kind.Appender);
+        temp.put(Sets.Discarder.class, Kind.SetDiscarder);
+        temp.put(Lists.Discarder.class, Kind.ListDiscarder);
+        temp.put(Lists.Prepender.class, Kind.Prepender);
+        temp.put(Maps.Putter.class, Kind.Putter);
+        temp.put(Lists.Setter.class, Kind.ListSetter);
+        temp.put(Sets.Setter.class, Kind.SetSetter);
+        temp.put(Maps.Setter.class, Kind.MapSetter);
+        temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
+        temp.put(Constants.Setter.class, Kind.ConstantSetter);
+        temp.put(Constants.Substracter.class, Kind.Subtracter);
+        temp.put(Maps.SetterByKey.class, Kind.SetterByKey);
+        temp.put(Lists.SetterByIndex.class, Kind.SetterByIndex);
+        temp.put(UserTypes.SetterByField.class, Kind.SetterByField);
+        return temp;
+    }
+
+    private interface ToOperation
+    {
+        Operation apply(ColumnMetadata column, Term keyOrIndex, 
FieldIdentifier field, Term value);
+    }
+
+    public enum Kind
+    {
+        SetAdder((byte) 1, (column, keyOrIndex, field, value) -> new 
Sets.Adder(column, value)),
+        ConstantAdder((byte) 2, (column, keyOrIndex, field, value) -> new 
Constants.Adder(column, value)),
+        Appender((byte) 3, (column, keyOrIndex, field, value) -> new 
Lists.Appender(column, value)),
+        SetDiscarder((byte) 4, (column, keyOrIndex, field, value) -> new 
Sets.Discarder(column, value)),
+        ListDiscarder((byte) 5, (column, keyOrIndex, field, value) -> new 
Lists.Discarder(column, value)),
+        Prepender((byte) 6, (column, keyOrIndex, field, value) -> new 
Lists.Prepender(column, value)),
+        Putter((byte) 7, (column, keyOrIndex, field, value) -> new 
Maps.Putter(column, value)),
+        ListSetter((byte) 8, (column, keyOrIndex, field, value) -> new 
Lists.Setter(column, value)),
+        SetSetter((byte) 9, (column, keyOrIndex, field, value) -> new 
Sets.Setter(column, value)),
+        MapSetter((byte) 10, (column, keyOrIndex, field, value) -> new 
Maps.Setter(column, value)),
+        UserTypeSetter((byte) 11, (column, keyOrIndex, field, value) -> new 
UserTypes.Setter(column, value)),
+        ConstantSetter((byte) 12, (column, keyOrIndex, field, value) -> new 
Constants.Setter(column, value)),
+        Subtracter((byte) 13, (column, keyOrIndex, field, value) -> new 
Constants.Substracter(column, value)),
+        SetterByKey((byte) 14, (column, keyOrIndex, field, value) -> new 
Maps.SetterByKey(column, keyOrIndex, value)),
+        SetterByIndex((byte) 15, (column, keyOrIndex, field, value) -> new 
Lists.SetterByIndex(column, keyOrIndex, value)),
+        SetterByField((byte) 16, (column, keyOrIndex, field, value) -> new 
UserTypes.SetterByField(column, field, value));

Review Comment:
   nit: UserTypeSetterByField?



##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Txn;
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.selection.ResultSetBuilder;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.cql3.transactions.RowDataReference;
+import org.apache.cassandra.cql3.transactions.ConditionStatement;
+import org.apache.cassandra.cql3.transactions.ReferenceOperation;
+import org.apache.cassandra.cql3.transactions.SelectReferenceSource;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnNamedRead;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LazyToString;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public class TransactionStatement implements CQLStatement
+{
+    public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PRIMARY_KEY_LET_MESSAGE = "SELECT in 
LET assignment without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "Normal 
SELECT without LIMIT 1 must specify all primary key elements; CQL %s";
+    public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions.";
+    public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps.";
+    public static final String EMPTY_TRANSACTION_MESSAGE = "Transaction 
contains no reads or writes";
+    public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
+
+    static class NamedSelect
+    {
+        final TxnDataName name;
+        final SelectStatement select;
+
+        public NamedSelect(TxnDataName name, SelectStatement select)
+        {
+            this.name = name;
+            this.select = select;
+        }
+    }
+
+    private final List<NamedSelect> assignments;
+    private final NamedSelect returningSelect;
+    private final List<RowDataReference> returningReferences;
+    private final List<ModificationStatement> updates;
+    private final List<ConditionStatement> conditions;
+
+    private final VariableSpecifications bindVariables;
+
+    public TransactionStatement(List<NamedSelect> assignments,
+                                NamedSelect returningSelect,
+                                List<RowDataReference> returningReferences,
+                                List<ModificationStatement> updates,
+                                List<ConditionStatement> conditions,
+                                VariableSpecifications bindVariables)
+    {
+        this.assignments = assignments;
+        this.returningSelect = returningSelect;
+        this.returningReferences = returningReferences;
+        this.updates = updates;
+        this.conditions = conditions;
+        this.bindVariables = bindVariables;
+    }
+
+    @Override
+    public List<ColumnSpecification> getBindVariables()
+    {
+        return bindVariables.getBindVariables();
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        // Assess read permissions for all data from both explicit LET 
statements and generated reads.
+        for (NamedSelect let : assignments)
+            let.select.authorize(state);
+
+        if (returningSelect != null)
+            returningSelect.select.authorize(state);
+
+        for (ModificationStatement update : updates)
+            update.authorize(state);
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        for (ModificationStatement statement : updates)
+            statement.validate(state);
+    }
+
+    @VisibleForTesting
+    public List<RowDataReference> getReturningReferences()
+    {
+        return returningReferences;
+    }
+
+    TxnNamedRead createNamedRead(NamedSelect namedSelect, QueryOptions options)
+    {
+        SelectStatement select = namedSelect.select;
+        ReadQuery readQuery = select.getQuery(options, 0);
+
+        // We reject reads from both LET and SELECT that do not specify a 
single row.
+        @SuppressWarnings("unchecked") 
+        SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+        return new TxnNamedRead(namedSelect.name, 
Iterables.getOnlyElement(selectQuery.queries));
+    }
+
+    private List<TxnNamedRead> createNamedReads(QueryOptions options, 
Consumer<Key> keyConsumer)
+    {
+        List<TxnNamedRead> reads = new ArrayList<>(assignments.size() + 1);
+
+        for (NamedSelect select : assignments)
+        {
+            TxnNamedRead read = createNamedRead(select, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        if (returningSelect != null)
+        {
+            TxnNamedRead read = createNamedRead(returningSelect, options);
+            keyConsumer.accept(read.key());
+            reads.add(read);
+        }
+
+        for (NamedSelect select : autoReads.values())
+            // don't need keyConsumer as the keys are known to exist due to 
Modification
+            reads.add(createNamedRead(select, options));
+
+        return reads;
+    }
+
+    TxnCondition createCondition(QueryOptions options)
+    {
+        if (conditions.isEmpty())
+            return TxnCondition.none();
+        if (conditions.size() == 1)
+            return conditions.get(0).createCondition(options);
+
+        List<TxnCondition> result = new ArrayList<>(conditions.size());
+        for (ConditionStatement condition : conditions)
+            result.add(condition.createCondition(options));
+
+        // TODO: OR support
+        return new TxnCondition.BooleanGroup(TxnCondition.Kind.AND, result);
+    }
+
+    private final Map<TxnDataName, NamedSelect> autoReads = new HashMap<>();
+
+    List<TxnWrite.Fragment> createWriteFragments(ClientState state, 
QueryOptions options, Consumer<Key> keyConsumer)
+    {
+        List<TxnWrite.Fragment> fragments = new ArrayList<>(updates.size());
+        int idx = 0;
+        for (ModificationStatement modification : updates)
+        {
+            TxnWrite.Fragment fragment = modification.getTxnWriteFragment(idx, 
state, options);
+            keyConsumer.accept(fragment.key);
+            fragments.add(fragment);
+
+            if 
(modification.allReferenceOperations().stream().anyMatch(ReferenceOperation::requiresRead))
+            {
+                // Reads are not merged by partition here due to potentially 
differing columns retrieved, etc.
+                TxnDataName partitionName = 
TxnDataName.partitionRead(modification.metadata(), fragment.key.partitionKey(), 
idx);
+                if (!autoReads.containsKey(partitionName))
+                    autoReads.put(partitionName, new 
NamedSelect(partitionName, modification.createSelectForTxn()));

Review Comment:
   since writes don't share the same auto read, this isn't an issue; this is 
because `partitionRead` includes the write `idx`; this does cause multiple 
reads to the same partition... so we could be smarter in some cases, but in 
cases where clustering don't match, we need to isolate still.



##########
src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+
+public class AccordUpdateParameters
+{
+    private final TxnData data;
+    private final QueryOptions options;
+
+    public AccordUpdateParameters(TxnData data, QueryOptions options)
+    {
+        this.data = data;
+        this.options = options;
+    }
+
+    public TxnData getData()
+    {
+        return data;
+    }
+
+    public UpdateParameters updateParameters(TableMetadata metadata, int 
rowIndex)
+    {
+        // This is currently only used by Guardrails, but this logically have 
issues with Accord as drifts in config
+        // values could cause unexpected issues in Accord. (ex. some nodes 
reject writes while others accept)
+        // For the time being, guardrails are disabled for Accord queries.
+        ClientState disabledGuardrails = null;
+
+        // What we use here doesn't matter as they get replaced before 
actually performing the write.
+        // see org.apache.cassandra.service.accord.txn.TxnWrite.Update.write
+        int nowInSeconds = 42;
+        long timestamp = nowInSeconds;
+
+        // TODO: How should Accord work with TTL?
+        int ttl = metadata.params.defaultTimeToLive;
+        return new UpdateParameters(metadata,
+                                    disabledGuardrails,
+                                    options,
+                                    timestamp,
+                                    nowInSeconds,
+                                    ttl,
+                                    prefetchRow(metadata, rowIndex));
+    }
+
+    private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, 
int index)

Review Comment:
   marking resolved.  this works now because of `index` but makes same row 
mutations do extra queries, which kinda sucks... correct > performance so cool 
with this for now.



##########
src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.transactions;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.types.utils.Bytes;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+
+public class RowDataReference extends Term.NonTerminal
+{
+    public static final String CANNOT_FIND_TUPLE_MESSAGE = "Cannot resolve 
reference to tuple '%s'.";
+    public static final String COLUMN_NOT_IN_TUPLE_MESSAGE = "Column '%s' does 
not exist in tuple '%s'.";
+
+    private final TxnDataName selectName;
+    private final ColumnMetadata column;
+    private final Term elementPath;
+    private final CellPath fieldPath;
+    
+    public RowDataReference(TxnDataName selectName, ColumnMetadata column, 
Term elementPath, CellPath fieldPath)
+    {
+        Preconditions.checkArgument(elementPath == null || fieldPath == null, 
"Cannot specify both element and field paths");
+        
+        this.selectName = selectName;
+        this.column = column;
+        this.elementPath = elementPath;
+        this.fieldPath = fieldPath;
+    }
+
+    @Override
+    public void collectMarkerSpecification(VariableSpecifications boundNames)
+    {
+        if (elementPath != null)
+            elementPath.collectMarkerSpecification(boundNames);
+    }
+
+    @Override
+    public Terminal bind(QueryOptions options) throws InvalidRequestException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsBindMarker()
+    {
+        return elementPath != null && elementPath.containsBindMarker();
+    }
+
+    @Override
+    public void addFunctionsTo(List<Function> functions)
+    {
+        throw new UnsupportedOperationException("Functions are not currently 
supported w/ reference terms.");
+    }
+
+    public ColumnMetadata toResultMetadata()
+    {
+        ColumnIdentifier fullName = getFullyQualifiedName();
+        ColumnMetadata forMetadata = column.withNewName(fullName);
+
+        if (isElementSelection())
+        {
+            if (forMetadata.type instanceof SetType)
+                forMetadata = forMetadata.withNewType(((SetType<?>) 
forMetadata.type).nameComparator());
+            else if (forMetadata.type instanceof MapType)

Review Comment:
   fixed this in my feedback branch, this breaks return `SELECT`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to