alex-ninja commented on a change in pull request #1117:
URL: https://github.com/apache/cassandra/pull/1117#discussion_r699186099



##########
File path: src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
##########
@@ -177,6 +175,8 @@ protected ModificationStatement 
prepareInternal(TableMetadata metadata,
 
             if (stmt.hasConditions() && 
!restrictions.hasAllPKColumnsRestrictedByEqualities())
             {
+                checkFalse(stmt.isVirtual(), "DELETE statements must restrict 
all PRIMARY KEY columns with equality relations");

Review comment:
       This check is to ensure a proper message (without mentioning static 
columns) is shown when not all PKs are specified with a condition.

##########
File path: 
test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
##########
@@ -249,76 +344,188 @@ public void testQueriesOnTableWithMultiplePks() throws 
Throwable
     }
 
     @Test
-    public void testModifications() throws Throwable
+    public void testDMLOperationOnWritableTable() throws Throwable
     {
         // check for clean state
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"));
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
 
         // fill the table, test UNLOGGED batch
         execute("BEGIN UNLOGGED BATCH " +
-                "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
-                "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
-                "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 1, v2 = 1 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 2, v2 = 2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 3, v2 = 3 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 4, v2 = 4 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 5, v2 = 5 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 6, v2 = 6 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 7, v2 = 7 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 8, v2 = 8 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_2' AND c2 = 'c2_1';" +
                 "APPLY BATCH");
         assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
-                   row("pk1", 1),
-                   row("pk2", 2),
-                   row("pk3", 3));
+                   row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                   row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                   row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                   row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L));
+
+        // update a single column with UPDATE
+        execute("UPDATE test_virtual_ks.vt2 SET v1 = 11 WHERE pk1 = 'pk1_1' 
AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"),
+                   row("pk1_1", "pk2_1", "c1_1", "c2_1", 11, 1L));
+
+        // update multiple columns with UPDATE
+        execute("UPDATE test_virtual_ks.vt2 SET v1 = 111, v2 = 111 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 111, 111L));
+
+        // update a single columns with INSERT
+        execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v2) VALUES 
('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 22)");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"),
+                   row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 22L));
+
+        // update multiple columns with INSERT
+        execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) 
VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 222, 222)");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 222, 222L));
+
+        // delete a single partition
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 
'pk2_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L));
+
+        // delete a first-level range (one-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_2' AND c1 <= 'c1_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L));
+
+        // delete a first-level range (two-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_2' AND c1 > 'c1_1' AND c1 < 'c1_3'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a second-level range (two-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_1' AND c1 = 'c1_1' AND c2 >= 'c2_3' AND c2 < 'c2_5'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a second-level range (one-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_1' AND c1 = 'c1_1' AND c2 < 'c2_5'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a single row
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a single column
+        execute("DELETE v1 FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND 
pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", null, 6L));
 
-        // test that LOGGED batches don't allow virtual table updates
+        // truncate
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+    }
+
+    @Test
+    public void testInvalidDMLOperationsOnWritableTable() throws Throwable
+    {
+        // test that LOGGED batch doesn't allow virtual table updates
         assertInvalidMessage("Cannot include a virtual table statement in a 
logged batch",
-                             "BEGIN BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE 
key ='pk1';" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE 
key ='pk2';" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE 
key ='pk3';" +
-                             "APPLY BATCH");
+                "BEGIN BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 1 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 2 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 
'pk1_3' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "APPLY BATCH");
 
         // test that UNLOGGED batch doesn't allow mixing updates for regular 
and virtual tables
-        createTable("CREATE TABLE %s (key text PRIMARY KEY, value int)");
+        createTable("CREATE TABLE %s (pk1 text, pk2 text, c1 text, c2 text, v1 
int, v2 bigint, PRIMARY KEY ((pk1, pk2), c1, c2))");
         assertInvalidMessage("Mutations for virtual and regular tables cannot 
exist in the same batch",
-                             "BEGIN UNLOGGED BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE 
key ='pk1'" +
-                             "UPDATE %s                  SET value = 2 WHERE 
key ='pk2'" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE 
key ='pk3'" +
-                             "APPLY BATCH");
-
-        // update a single value with UPDATE
-        execute("UPDATE test_virtual_ks.vt2 SET value = 11 WHERE key ='pk1'");
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 
'pk1'"),
-                   row("pk1", 11));
-
-        // update a single value with INSERT
-        executeNet("INSERT INTO test_virtual_ks.vt2 (key, value) VALUES 
('pk2', 22)");
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 
'pk2'"),
-                   row("pk2", 22));
-
-        // test that deletions are (currently) rejected
-        assertInvalidMessage("Virtual tables don't support DELETE statements",
-                             "DELETE FROM test_virtual_ks.vt2 WHERE key 
='pk1'");
+                "BEGIN UNLOGGED BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 1 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE %s                  SET v1 = 2 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 
'pk1_3' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "APPLY BATCH");
+
+        // test that TIMESTAMP is (currently) rejected with INSERT and UPDATE
+        assertInvalidMessage("Custom timestamp is not supported by virtual 
tables",
+                "INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) 
VALUES ('pk1', 'pk2', 'c1', 'c2', 1, 11) USING TIMESTAMP 123456789");
+        assertInvalidMessage("Custom timestamp is not supported by virtual 
tables",
+                "UPDATE test_virtual_ks.vt2 USING TIMESTAMP 123456789 SET v1 = 
1, v2 = 11 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2'");
 
         // test that TTL is (currently) rejected with INSERT and UPDATE
         assertInvalidMessage("Expiring columns are not supported by virtual 
tables",
-                             "INSERT INTO test_virtual_ks.vt2 (key, value) 
VALUES ('pk1', 11) USING TTL 86400");
+                "INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) 
VALUES ('pk1', 'pk2', 'c1', 'c2', 1, 11) USING TTL 86400");
         assertInvalidMessage("Expiring columns are not supported by virtual 
tables",
-                             "UPDATE test_virtual_ks.vt2 USING TTL 86400 SET 
value = 11 WHERE key ='pk1'");
+                "UPDATE test_virtual_ks.vt2 USING TTL 86400 SET v1 = 1, v2 = 
11 WHERE pk1 = 'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2'");
 
-        // test that LWT is (currently) rejected with virtual tables in batches
+        // test that LWT is (currently) rejected with BATCH
         assertInvalidMessage("Conditional BATCH statements cannot include 
mutations for virtual tables",
-                             "BEGIN UNLOGGED BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE 
key ='pk3' IF value = 2;" +
-                             "APPLY BATCH");
+                "BEGIN UNLOGGED BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 
'pk1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2' IF v1 = 2;" +
+                        "APPLY BATCH");
 
-        // test that LWT is (currently) rejected with virtual tables in UPDATEs
+        // test that LWT is (currently) rejected with INSERT and UPDATE
         assertInvalidMessage("Conditional updates are not supported by virtual 
tables",
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE 
key ='pk3' IF value = 2");
-
-        // test that LWT is (currently) rejected with virtual tables in INSERTs
+                "INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1) VALUES 
('pk1', 'pk2', 'c1', 'c2', 2) IF NOT EXISTS");
         assertInvalidMessage("Conditional updates are not supported by virtual 
tables",
-                             "INSERT INTO test_virtual_ks.vt2 (key, value) 
VALUES ('pk2', 22) IF NOT EXISTS");
+                "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 'pk1' AND 
pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2' IF v1 = 2");
+
+        // test that row DELETE without full primary key with equality 
relation is (currently) rejected
+        assertInvalidMessage("Some partition key parts are missing: pk2",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND c1 = 
'c1' AND c2 > 'c2'");
+        assertInvalidMessage("Only EQ and IN relation are supported on the 
partition key (unless you use the token() function) for DELETE statements",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 > 
'pk2' AND c1 = 'c1' AND c2 > 'c2'");
+        assertInvalidMessage("KEY column \"c2\" cannot be restricted as 
preceding column \"c1\" is not restricted",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 = 
'pk2' AND c2 > 'c2'");
+        assertInvalidMessage("Clustering column \"c2\" cannot be restricted 
(preceding column \"c1\" is restricted by a non-EQ relation)",
+                "DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1' AND pk2 = 
'pk2' AND c1 > 'c1' AND c2 > 'c2'");
+        assertInvalidMessage("DELETE statements must restrict all PRIMARY KEY 
columns with equality relations",

Review comment:
       This corresponds to the new restriction in `DeleteStatement`.

##########
File path: 
test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
##########
@@ -53,52 +57,143 @@
     private static final String VT2_NAME = "vt2";
     private static final String VT3_NAME = "vt3";
 
-    private static class WritableVirtualTable extends AbstractVirtualTable
+    private static class WritableVirtualTable extends 
AbstractWritableVirtualTable.SimpleWritableVirtualTable
     {
-        private final ColumnMetadata valueColumn;
-        private final Map<String, Integer> backingMap = new HashMap<>();
+        // <pk1, pk2> -> c1 -> c2 -> <v1, v2>
+        private final Map<Pair<String, String>, SortedMap<String, 
SortedMap<String, Pair<Integer, Long>>>> backingMap = new ConcurrentHashMap<>();
 
         WritableVirtualTable(String keyspaceName, String tableName)
         {
             super(TableMetadata.builder(keyspaceName, tableName)
                                .kind(TableMetadata.Kind.VIRTUAL)
-                               .addPartitionKeyColumn("key", UTF8Type.instance)
-                               .addRegularColumn("value", Int32Type.instance)
+                               .addPartitionKeyColumn("pk1", UTF8Type.instance)
+                               .addPartitionKeyColumn("pk2", UTF8Type.instance)
+                               .addClusteringColumn("c1", UTF8Type.instance)
+                               .addClusteringColumn("c2", UTF8Type.instance)
+                               .addRegularColumn("v1", Int32Type.instance)
+                               .addRegularColumn("v2", LongType.instance)
                                .build());
-            valueColumn = metadata().regularColumns().getSimple(0);
         }
 
         @Override
         public DataSet data()
         {
             SimpleDataSet data = new SimpleDataSet(metadata());
-            backingMap.forEach((key, value) -> data.row(key).column("value", 
value));
+            backingMap.forEach((pkPair, c1Map) ->
+                    c1Map.forEach((c1, c2Map) ->
+                    c2Map.forEach((c2, valuePair) -> data.row(pkPair.left, 
pkPair.right, c1, c2)
+                            .column("v1", valuePair.left)
+                            .column("v2", valuePair.right))));
             return data;
         }
 
         @Override
-        public void apply(PartitionUpdate update)
+        protected void applyPartitionDelete(Object[] partitionKeyColumnValues)
+        {
+            String pk1 = (String) partitionKeyColumnValues[0];
+            String pk2 = (String) partitionKeyColumnValues[1];
+            backingMap.remove(Pair.create(pk1, pk2));
+        }
+
+        @Override
+        protected void applyRangeTombstone(Object[] partitionKeyColumnValues,

Review comment:
       I believe for most writable VTs it won't be required to support range 
deletes. However, it is possible as it is demonstrated in this method.

##########
File path: 
src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.SortedMap;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * An abstract virtual table implementation that builds the resultset on 
demand and allows source modification.
+ */
+public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable
+{
+
+    protected AbstractWritableVirtualTable(TableMetadata metadata)
+    {
+        super(metadata);
+    }
+
+    @Override
+    public void apply(PartitionUpdate update)
+    {
+        DecoratedKey partitionKey = update.partitionKey();
+
+        if (update.deletionInfo().isLive())
+            update.forEach(row ->
+            {
+                Clustering<?> clusteringColumns = row.clustering();
+
+                if (row.deletion().isLive())
+                    row.forEach(columnMetadata ->
+                    {
+                        if (columnMetadata.column().isComplex())

Review comment:
       Currently deletes for complex cell types are not supported. I'd like to 
take care of that in a separate PR when I work on VTs for auth caches.

##########
File path: 
test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java
##########
@@ -249,76 +344,188 @@ public void testQueriesOnTableWithMultiplePks() throws 
Throwable
     }
 
     @Test
-    public void testModifications() throws Throwable
+    public void testDMLOperationOnWritableTable() throws Throwable
     {
         // check for clean state
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"));
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
 
         // fill the table, test UNLOGGED batch
         execute("BEGIN UNLOGGED BATCH " +
-                "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" +
-                "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" +
-                "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 1, v2 = 1 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 2, v2 = 2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 3, v2 = 3 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 4, v2 = 4 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_3';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 5, v2 = 5 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 6, v2 = 6 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 7, v2 = 7 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_1';" +
+                "UPDATE test_virtual_ks.vt2 SET v1 = 8, v2 = 8 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_2' AND c2 = 'c2_1';" +
                 "APPLY BATCH");
         assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
-                   row("pk1", 1),
-                   row("pk2", 2),
-                   row("pk3", 3));
+                   row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L),
+                   row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                   row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                   row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                   row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L));
+
+        // update a single column with UPDATE
+        execute("UPDATE test_virtual_ks.vt2 SET v1 = 11 WHERE pk1 = 'pk1_1' 
AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"),
+                   row("pk1_1", "pk2_1", "c1_1", "c2_1", 11, 1L));
+
+        // update multiple columns with UPDATE
+        execute("UPDATE test_virtual_ks.vt2 SET v1 = 111, v2 = 111 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_1", 111, 111L));
+
+        // update a single columns with INSERT
+        execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v2) VALUES 
('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 22)");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"),
+                   row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 22L));
+
+        // update multiple columns with INSERT
+        execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) 
VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 222, 222)");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"),
+                row("pk1_1", "pk2_1", "c1_1", "c2_2", 222, 222L));
+
+        // delete a single partition
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 
'pk2_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L));
+
+        // delete a first-level range (one-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_2' AND c1 <= 'c1_1'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L),
+                row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L));
+
+        // delete a first-level range (two-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_2' AND c1 > 'c1_1' AND c1 < 'c1_3'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a second-level range (two-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_1' AND c1 = 'c1_1' AND c2 >= 'c2_3' AND c2 < 'c2_5'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a second-level range (one-sided limit)
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_1' AND c1 = 'c1_1' AND c2 < 'c2_5'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a single row
+        execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 
'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L));
+
+        // delete a single column
+        execute("DELETE v1 FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND 
pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6'");
+        assertRows(execute("SELECT * FROM test_virtual_ks.vt2"),
+                row("pk1_2", "pk2_1", "c1_1", "c2_6", null, 6L));
 
-        // test that LOGGED batches don't allow virtual table updates
+        // truncate
+        execute("TRUNCATE test_virtual_ks.vt2");
+        assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2"));
+    }
+
+    @Test
+    public void testInvalidDMLOperationsOnWritableTable() throws Throwable
+    {
+        // test that LOGGED batch doesn't allow virtual table updates
         assertInvalidMessage("Cannot include a virtual table statement in a 
logged batch",
-                             "BEGIN BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE 
key ='pk1';" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE 
key ='pk2';" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE 
key ='pk3';" +
-                             "APPLY BATCH");
+                "BEGIN BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 1 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 2 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 
'pk1_3' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "APPLY BATCH");
 
         // test that UNLOGGED batch doesn't allow mixing updates for regular 
and virtual tables
-        createTable("CREATE TABLE %s (key text PRIMARY KEY, value int)");
+        createTable("CREATE TABLE %s (pk1 text, pk2 text, c1 text, c2 text, v1 
int, v2 bigint, PRIMARY KEY ((pk1, pk2), c1, c2))");
         assertInvalidMessage("Mutations for virtual and regular tables cannot 
exist in the same batch",
-                             "BEGIN UNLOGGED BATCH " +
-                             "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE 
key ='pk1'" +
-                             "UPDATE %s                  SET value = 2 WHERE 
key ='pk2'" +
-                             "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE 
key ='pk3'" +
-                             "APPLY BATCH");
-
-        // update a single value with UPDATE
-        execute("UPDATE test_virtual_ks.vt2 SET value = 11 WHERE key ='pk1'");
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 
'pk1'"),
-                   row("pk1", 11));
-
-        // update a single value with INSERT
-        executeNet("INSERT INTO test_virtual_ks.vt2 (key, value) VALUES 
('pk2', 22)");
-        assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE key = 
'pk2'"),
-                   row("pk2", 22));
-
-        // test that deletions are (currently) rejected
-        assertInvalidMessage("Virtual tables don't support DELETE statements",
-                             "DELETE FROM test_virtual_ks.vt2 WHERE key 
='pk1'");
+                "BEGIN UNLOGGED BATCH " +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 1 WHERE pk1 = 
'pk1_1' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE %s                  SET v1 = 2 WHERE pk1 = 
'pk1_2' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "UPDATE test_virtual_ks.vt2 SET v1 = 3 WHERE pk1 = 
'pk1_3' AND pk2 = 'pk2' AND c1 = 'c1' AND c2 = 'c2';" +
+                        "APPLY BATCH");
+
+        // test that TIMESTAMP is (currently) rejected with INSERT and UPDATE

Review comment:
       This corresponds to custom timestamps restriction.

##########
File path: 
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
##########
@@ -266,6 +266,7 @@ public void validate(ClientState state) throws 
InvalidRequestException
         checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide 
custom timestamp for counter updates");
         checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide 
custom TTL for counter updates");
         checkFalse(isView(), "Cannot directly modify a materialized view");
+        checkFalse(isVirtual() && attrs.isTimestampSet(), "Custom timestamp is 
not supported by virtual tables");

Review comment:
       This is to ensure custom timestamps are not supported.

##########
File path: doc/source/new/virtualtables.rst
##########
@@ -66,21 +66,21 @@ Virtual Table Limitations
 
 Virtual tables and virtual keyspaces have some limitations initially though 
some of these could change such as:
 
-- Cannot alter or drop virtual keyspaces or tables
-- Cannot truncate virtual tables
 - Expiring columns are not supported by virtual tables
+- Custom timestamps are not supported by virtual tables
 - Conditional updates are not supported by virtual tables
-- Cannot create tables in virtual keyspaces
-- Cannot perform any operations against virtual keyspace
 - Secondary indexes are not supported on virtual tables
-- Cannot create functions in virtual keyspaces
-- Cannot create types in virtual keyspaces
 - Materialized views are not supported on virtual tables
-- Virtual tables don't support ``DELETE`` statements
+- Virtual tables support modifications only if the underlaying implementation 
allows it
 - Cannot ``CREATE TRIGGER`` against a virtual table
 - Conditional ``BATCH`` statements cannot include mutations for virtual tables
 - Cannot include a virtual table statement in a logged batch
 - Mutations for virtual and regular tables cannot exist in the same batch
+- Cannot alter or drop virtual keyspaces or tables

Review comment:
       I moved all keyspace-related limitations to the end of the list, so it 
is easier to read.

##########
File path: 
src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.SortedMap;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * An abstract virtual table implementation that builds the resultset on 
demand and allows source modification.
+ */
+public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable
+{
+
+    protected AbstractWritableVirtualTable(TableMetadata metadata)
+    {
+        super(metadata);
+    }
+
+    @Override
+    public void apply(PartitionUpdate update)
+    {
+        DecoratedKey partitionKey = update.partitionKey();
+
+        if (update.deletionInfo().isLive())
+            update.forEach(row ->
+            {
+                Clustering<?> clusteringColumns = row.clustering();
+
+                if (row.deletion().isLive())
+                    row.forEach(columnMetadata ->
+                    {
+                        if (columnMetadata.column().isComplex())
+                            throw new InvalidRequestException("Complex type 
column deletes are not supported by table " + metadata);
+
+                        Cell<?> cell = (Cell<?>) columnMetadata;
+
+                        if (cell.isTombstone())
+                            applyColumnDelete(partitionKey, clusteringColumns, 
cell);
+                        else
+                            applyColumnUpdate(partitionKey, clusteringColumns, 
cell);
+                    });
+                else
+                    applyRowDelete(partitionKey, clusteringColumns);
+            });
+        else
+        {
+            // MutableDeletionInfo may have partition delete or range 
tombstone list or both
+            if (update.deletionInfo().hasRanges())
+                update.deletionInfo()
+                        .rangeIterator(false)
+                        .forEachRemaining(rt -> 
applyRangeTombstone(partitionKey, rt.deletedSlice()));
+
+            if (!update.deletionInfo().getPartitionDeletion().isLive())
+                applyPartitionDelete(partitionKey);
+        }
+    }
+
+    protected void applyPartitionDelete(DecoratedKey partitionKey)
+    {
+        throw new InvalidRequestException("Partition deletion is not supported 
by table " + metadata);
+    }
+
+    protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice)
+    {
+        throw new InvalidRequestException("Range deletion is not supported by 
table " + metadata);
+    }
+
+    protected void applyRowDelete(DecoratedKey partitionKey, 
ClusteringPrefix<?> clusteringColumns)
+    {
+        throw new InvalidRequestException("Row deletion is not supported by 
table " + metadata);
+    }
+
+    protected void applyColumnDelete(DecoratedKey partitionKey, 
ClusteringPrefix<?> clusteringColumns, Cell<?> cell)
+    {
+        throw new InvalidRequestException("Column deletion is not supported by 
table " + metadata);
+    }
+
+    protected abstract void applyColumnUpdate(DecoratedKey partitionKey, 
ClusteringPrefix<?> clusteringColumns, Cell<?> cell);
+
+    public static abstract class SimpleWritableVirtualTable extends 
AbstractWritableVirtualTable {
+
+        protected SimpleWritableVirtualTable(TableMetadata metadata)
+        {
+            super(metadata);
+        }
+
+        @Override
+        protected void applyPartitionDelete(DecoratedKey partitionKey)
+        {
+            
applyPartitionDelete(extractPartitionKeyColumnValues(partitionKey));
+        }
+
+        protected void applyPartitionDelete(Object[] partitionKeyColumnValues)
+        {
+            throw new InvalidRequestException("Partition deletion is not 
supported by table " + metadata);
+
+        }
+
+        @Override
+        protected void applyRangeTombstone(DecoratedKey partitionKey, Slice 
slice)
+        {
+            ClusteringBound<?> startClusteringColumns = slice.start();
+            Object[] startClusteringColumnValues = 
extractClusteringColumnValues(startClusteringColumns);
+
+            ClusteringBound<?> endClusteringColumns = slice.end();
+            Object[] endClusteringColumnValues = 
extractClusteringColumnValues(endClusteringColumns);
+
+            // It is a prefix of clustering columns that have equal condition. 
For example, if there are two clustering
+            // columns c1 and c2, then it will have c1. In case of a single 
clustering column the prefix is empty.
+            int clusteringColumnsPrefixLength = 
Math.max(startClusteringColumnValues.length, endClusteringColumnValues.length) 
- 1;
+            Object[] clusteringColumnValuesPrefix = new 
Object[clusteringColumnsPrefixLength];
+            System.arraycopy(startClusteringColumnValues, 0, 
clusteringColumnValuesPrefix, 0, clusteringColumnsPrefixLength);
+
+            Object startClusteringColumnValue = 
startClusteringColumns.isBottom()
+                    ? null : 
startClusteringColumnValues[startClusteringColumnValues.length - 1];
+            boolean isStartClusteringColumnInclusive = 
startClusteringColumns.isInclusive();
+
+            Object endClusteringColumnValue = endClusteringColumns.isBottom()
+                    ? null : 
endClusteringColumnValues[endClusteringColumnValues.length - 1];
+            boolean isEndClusteringColumnInclusive = 
endClusteringColumns.isInclusive();
+
+            applyRangeTombstone(extractPartitionKeyColumnValues(partitionKey),
+                    clusteringColumnValuesPrefix,
+                    startClusteringColumnValue,
+                    isStartClusteringColumnInclusive,
+                    endClusteringColumnValue,
+                    isEndClusteringColumnInclusive);
+        }
+
+        /**
+         * This method is called for every range tombstone.
+         *
+         * @param partitionKeyColumnValues is non-empty
+         * @param clusteringColumnValuesPrefix is empty if there is a single 
clustering column
+         * @param startClusteringColumnValue is null if there is no "gt" or 
"gte" condition on the clustering column
+         * @param isStartClusteringColumnInclusive distinguishes "gt" and 
"gte" conditions
+         * @param endClusteringColumnValue is null if there is no "lt" or 
"lte" condition on the clustering column
+         * @param isEndClusteringColumnInclusive distinguishes "lt" and "lte" 
conditions
+         */
+        protected void applyRangeTombstone(Object[] partitionKeyColumnValues,
+                                           Object[] 
clusteringColumnValuesPrefix,
+                                           @Nullable Object 
startClusteringColumnValue,
+                                           boolean 
isStartClusteringColumnInclusive,
+                                           @Nullable Object 
endClusteringColumnValue,
+                                           boolean 
isEndClusteringColumnInclusive)
+        {
+            throw new InvalidRequestException("Range deletion is not supported 
by table " + metadata);
+        }
+
+
+        @Override
+        protected void applyRowDelete(DecoratedKey partitionKey, 
ClusteringPrefix<?> clusteringColumns)
+        {
+            applyRowDelete(extractPartitionKeyColumnValues(partitionKey), 
extractClusteringColumnValues(clusteringColumns));
+        }
+
+        protected void applyRowDelete(Object[] partitionKeyColumnValues, 
Object[] clusteringColumnValues)
+        {
+            throw new InvalidRequestException("Row deletion is not supported 
by table " + metadata);
+        }
+
+        @Override
+        protected void applyColumnDelete(DecoratedKey partitionKey, 
ClusteringPrefix<?> clusteringColumns, Cell<?> cell)
+        {
+            applyColumnDelete(extractPartitionKeyColumnValues(partitionKey),
+                    extractClusteringColumnValues(clusteringColumns),
+                    extractColumnName(cell));
+        }
+
+        protected void applyColumnDelete(Object[] partitionKeyColumnValues, 
Object[] clusteringColumnValues, String columnName)
+        {
+            throw new InvalidRequestException("Column deletion is not 
supported by table " + metadata);
+        }
+
+        @Override
+        protected void applyColumnUpdate(DecoratedKey partitionKey, 
ClusteringPrefix<?> clusteringColumns, Cell<?> cell)
+        {
+            applyColumnUpdate(extractPartitionKeyColumnValues(partitionKey),
+                    extractClusteringColumnValues(clusteringColumns),
+                    extractColumnName(cell),
+                    extractColumnValue(cell));
+        }
+
+        protected abstract void applyColumnUpdate(Object[] 
partitionKeyColumnValues, Object[] clusteringColumnValues,
+                                                  String columnName, Object 
columnValue);
+
+        protected <V> void filterStringKeySortedMap(SortedMap<String, V> 
clusteringColumnsMap,

Review comment:
       It is just a utility method that can be re-used in the child classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to