blerer commented on a change in pull request #1117: URL: https://github.com/apache/cassandra/pull/1117#discussion_r701206395
########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) + row.forEach(columnMetadata -> Review comment: The variable name `columnMetadata` is confusing as it is not a `ColumnMetadata` object but a `ColumnData` one. ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable Review comment: I wonder if we should not collapse the hierachy and merge `AbstractWritableVirtualTable `and `SimpleWritableVirtualTable `together. It would be easy to separate them later on if needed. Having multiple sub-classes that are not used is often a bit confusing and make things harder to understand for no real reason. The `DataSet `hierachy suffer from the same problem, in my opinion, there is an `AbstractDataSet` but it has only a single sub-class. What is your opinion? ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) Review comment: For large block like that is it is probably less confusing and less error prone to use parentheses. ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) + row.forEach(columnMetadata -> + { + if (columnMetadata.column().isComplex()) + throw new InvalidRequestException("Complex type column deletes are not supported by table " + metadata); + + Cell<?> cell = (Cell<?>) columnMetadata; + + if (cell.isTombstone()) + applyColumnDelete(partitionKey, clusteringColumns, cell); + else + applyColumnUpdate(partitionKey, clusteringColumns, cell); + }); + else + applyRowDelete(partitionKey, clusteringColumns); + }); + else + { + // MutableDeletionInfo may have partition delete or range tombstone list or both + if (update.deletionInfo().hasRanges()) + update.deletionInfo() + .rangeIterator(false) + .forEachRemaining(rt -> applyRangeTombstone(partitionKey, rt.deletedSlice())); + + if (!update.deletionInfo().getPartitionDeletion().isLive()) + applyPartitionDelete(partitionKey); + } + } + + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + } + + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + throw new InvalidRequestException("Range deletion is not supported by table " + metadata); + } + + protected void applyRowDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns) + { + throw new InvalidRequestException("Row deletion is not supported by table " + metadata); + } + + protected void applyColumnDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell) + { + throw new InvalidRequestException("Column deletion is not supported by table " + metadata); + } + + protected abstract void applyColumnUpdate(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell); + + public static abstract class SimpleWritableVirtualTable extends AbstractWritableVirtualTable { + + protected SimpleWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + applyPartitionDelete(extractPartitionKeyColumnValues(partitionKey)); + } + + protected void applyPartitionDelete(Object[] partitionKeyColumnValues) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + + } + + @Override + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + ClusteringBound<?> startClusteringColumns = slice.start(); + Object[] startClusteringColumnValues = extractClusteringColumnValues(startClusteringColumns); + + ClusteringBound<?> endClusteringColumns = slice.end(); + Object[] endClusteringColumnValues = extractClusteringColumnValues(endClusteringColumns); + + // It is a prefix of clustering columns that have equal condition. For example, if there are two clustering + // columns c1 and c2, then it will have c1. In case of a single clustering column the prefix is empty. + int clusteringColumnsPrefixLength = Math.max(startClusteringColumnValues.length, endClusteringColumnValues.length) - 1; + Object[] clusteringColumnValuesPrefix = new Object[clusteringColumnsPrefixLength]; + System.arraycopy(startClusteringColumnValues, 0, clusteringColumnValuesPrefix, 0, clusteringColumnsPrefixLength); + + Object startClusteringColumnValue = startClusteringColumns.isBottom() + ? null : startClusteringColumnValues[startClusteringColumnValues.length - 1]; + boolean isStartClusteringColumnInclusive = startClusteringColumns.isInclusive(); + + Object endClusteringColumnValue = endClusteringColumns.isBottom() + ? null : endClusteringColumnValues[endClusteringColumnValues.length - 1]; + boolean isEndClusteringColumnInclusive = endClusteringColumns.isInclusive(); + + applyRangeTombstone(extractPartitionKeyColumnValues(partitionKey), + clusteringColumnValuesPrefix, + startClusteringColumnValue, + isStartClusteringColumnInclusive, + endClusteringColumnValue, + isEndClusteringColumnInclusive); Review comment: Rather than having `startClusteringColumnValue`, `isStartClusteringColumnInclusive`, `endClusteringColumnValue` and `isEndClusteringColumnInclusive` would it not be possible to use a `com.google.common.collect.Range` object? I think that it could help to simplify the code. ########## File path: test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java ########## @@ -249,76 +344,188 @@ public void testQueriesOnTableWithMultiplePks() throws Throwable } @Test - public void testModifications() throws Throwable + public void testDMLOperationOnWritableTable() throws Throwable { // check for clean state - assertRows(execute("SELECT * FROM test_virtual_ks.vt2")); + assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2")); // fill the table, test UNLOGGED batch execute("BEGIN UNLOGGED BATCH " + - "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" + - "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" + - "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 1, v2 = 1 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 2, v2 = 2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 3, v2 = 3 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 4, v2 = 4 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_3';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 5, v2 = 5 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 6, v2 = 6 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 7, v2 = 7 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_1';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 8, v2 = 8 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_2' AND c2 = 'c2_1';" + "APPLY BATCH"); assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), - row("pk1", 1), - row("pk2", 2), - row("pk3", 3)); + row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L), + row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L), + row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L), + row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L), + row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L), + row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L), + row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L)); + + // update a single column with UPDATE + execute("UPDATE test_virtual_ks.vt2 SET v1 = 11 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"), + row("pk1_1", "pk2_1", "c1_1", "c2_1", 11, 1L)); + + // update multiple columns with UPDATE + execute("UPDATE test_virtual_ks.vt2 SET v1 = 111, v2 = 111 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"), + row("pk1_1", "pk2_1", "c1_1", "c2_1", 111, 111L)); + + // update a single columns with INSERT + execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v2) VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 22)"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"), + row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 22L)); + + // update multiple columns with INSERT + execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 222, 222)"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"), + row("pk1_1", "pk2_1", "c1_1", "c2_2", 222, 222L)); + + // delete a single partition + execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), + row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L), + row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L), + row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L), + row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L), + row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L)); + + // delete a first-level range (one-sided limit) + execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 <= 'c1_1'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), + row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L), + row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L), + row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L), + row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L)); + + // delete a first-level range (two-sided limit) + execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 > 'c1_1' AND c1 < 'c1_3'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), + row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L), + row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L), + row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L)); + + // delete a second-level range (two-sided limit) + execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 >= 'c2_3' AND c2 < 'c2_5'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), + row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L), + row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L)); + + // delete a second-level range (one-sided limit) + execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 < 'c2_5'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), + row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L)); + + // delete a single row + execute("DELETE FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L)); + + // delete a single column + execute("DELETE v1 FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), + row("pk1_2", "pk2_1", "c1_1", "c2_6", null, 6L)); Review comment: We should also check for deletion using `IN` operators ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable Review comment: I would rename `AbstractWritableVirtualTable` to `AbstractMutableVirtualTable`, I imagine that we might have some virtual tables that only support deletion but not update (it might be the case for the caches virtual tables). ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) + row.forEach(columnMetadata -> + { + if (columnMetadata.column().isComplex()) + throw new InvalidRequestException("Complex type column deletes are not supported by table " + metadata); + + Cell<?> cell = (Cell<?>) columnMetadata; + + if (cell.isTombstone()) + applyColumnDelete(partitionKey, clusteringColumns, cell); + else + applyColumnUpdate(partitionKey, clusteringColumns, cell); + }); + else + applyRowDelete(partitionKey, clusteringColumns); + }); + else + { + // MutableDeletionInfo may have partition delete or range tombstone list or both + if (update.deletionInfo().hasRanges()) + update.deletionInfo() + .rangeIterator(false) + .forEachRemaining(rt -> applyRangeTombstone(partitionKey, rt.deletedSlice())); + + if (!update.deletionInfo().getPartitionDeletion().isLive()) + applyPartitionDelete(partitionKey); + } + } + + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + } + + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + throw new InvalidRequestException("Range deletion is not supported by table " + metadata); + } + + protected void applyRowDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns) + { + throw new InvalidRequestException("Row deletion is not supported by table " + metadata); + } + + protected void applyColumnDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell) + { + throw new InvalidRequestException("Column deletion is not supported by table " + metadata); + } + + protected abstract void applyColumnUpdate(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell); + + public static abstract class SimpleWritableVirtualTable extends AbstractWritableVirtualTable { + + protected SimpleWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + applyPartitionDelete(extractPartitionKeyColumnValues(partitionKey)); + } + + protected void applyPartitionDelete(Object[] partitionKeyColumnValues) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + + } + + @Override + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + ClusteringBound<?> startClusteringColumns = slice.start(); + Object[] startClusteringColumnValues = extractClusteringColumnValues(startClusteringColumns); + + ClusteringBound<?> endClusteringColumns = slice.end(); + Object[] endClusteringColumnValues = extractClusteringColumnValues(endClusteringColumns); + + // It is a prefix of clustering columns that have equal condition. For example, if there are two clustering + // columns c1 and c2, then it will have c1. In case of a single clustering column the prefix is empty. + int clusteringColumnsPrefixLength = Math.max(startClusteringColumnValues.length, endClusteringColumnValues.length) - 1; + Object[] clusteringColumnValuesPrefix = new Object[clusteringColumnsPrefixLength]; + System.arraycopy(startClusteringColumnValues, 0, clusteringColumnValuesPrefix, 0, clusteringColumnsPrefixLength); + + Object startClusteringColumnValue = startClusteringColumns.isBottom() + ? null : startClusteringColumnValues[startClusteringColumnValues.length - 1]; + boolean isStartClusteringColumnInclusive = startClusteringColumns.isInclusive(); + + Object endClusteringColumnValue = endClusteringColumns.isBottom() + ? null : endClusteringColumnValues[endClusteringColumnValues.length - 1]; + boolean isEndClusteringColumnInclusive = endClusteringColumns.isInclusive(); + + applyRangeTombstone(extractPartitionKeyColumnValues(partitionKey), + clusteringColumnValuesPrefix, + startClusteringColumnValue, + isStartClusteringColumnInclusive, + endClusteringColumnValue, + isEndClusteringColumnInclusive); Review comment: The reasons for extractin the prefix are not fully clear to me? Could you explain me your reasoning? ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) + row.forEach(columnMetadata -> + { + if (columnMetadata.column().isComplex()) + throw new InvalidRequestException("Complex type column deletes are not supported by table " + metadata); + + Cell<?> cell = (Cell<?>) columnMetadata; + + if (cell.isTombstone()) + applyColumnDelete(partitionKey, clusteringColumns, cell); + else + applyColumnUpdate(partitionKey, clusteringColumns, cell); + }); + else + applyRowDelete(partitionKey, clusteringColumns); + }); + else + { + // MutableDeletionInfo may have partition delete or range tombstone list or both + if (update.deletionInfo().hasRanges()) + update.deletionInfo() + .rangeIterator(false) + .forEachRemaining(rt -> applyRangeTombstone(partitionKey, rt.deletedSlice())); + + if (!update.deletionInfo().getPartitionDeletion().isLive()) + applyPartitionDelete(partitionKey); + } + } + + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + } + + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + throw new InvalidRequestException("Range deletion is not supported by table " + metadata); + } + + protected void applyRowDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns) + { + throw new InvalidRequestException("Row deletion is not supported by table " + metadata); + } + + protected void applyColumnDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell) + { + throw new InvalidRequestException("Column deletion is not supported by table " + metadata); + } + + protected abstract void applyColumnUpdate(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell); + + public static abstract class SimpleWritableVirtualTable extends AbstractWritableVirtualTable { + + protected SimpleWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + applyPartitionDelete(extractPartitionKeyColumnValues(partitionKey)); + } + + protected void applyPartitionDelete(Object[] partitionKeyColumnValues) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + + } + + @Override + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + ClusteringBound<?> startClusteringColumns = slice.start(); + Object[] startClusteringColumnValues = extractClusteringColumnValues(startClusteringColumns); + + ClusteringBound<?> endClusteringColumns = slice.end(); + Object[] endClusteringColumnValues = extractClusteringColumnValues(endClusteringColumns); + + // It is a prefix of clustering columns that have equal condition. For example, if there are two clustering + // columns c1 and c2, then it will have c1. In case of a single clustering column the prefix is empty. + int clusteringColumnsPrefixLength = Math.max(startClusteringColumnValues.length, endClusteringColumnValues.length) - 1; + Object[] clusteringColumnValuesPrefix = new Object[clusteringColumnsPrefixLength]; + System.arraycopy(startClusteringColumnValues, 0, clusteringColumnValuesPrefix, 0, clusteringColumnsPrefixLength); + + Object startClusteringColumnValue = startClusteringColumns.isBottom() + ? null : startClusteringColumnValues[startClusteringColumnValues.length - 1]; + boolean isStartClusteringColumnInclusive = startClusteringColumns.isInclusive(); + + Object endClusteringColumnValue = endClusteringColumns.isBottom() + ? null : endClusteringColumnValues[endClusteringColumnValues.length - 1]; + boolean isEndClusteringColumnInclusive = endClusteringColumns.isInclusive(); + + applyRangeTombstone(extractPartitionKeyColumnValues(partitionKey), + clusteringColumnValuesPrefix, + startClusteringColumnValue, + isStartClusteringColumnInclusive, + endClusteringColumnValue, + isEndClusteringColumnInclusive); Review comment: The reasons for extracting the prefix are not fully clear to me? Could you explain me your reasoning? ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) + row.forEach(columnMetadata -> + { + if (columnMetadata.column().isComplex()) + throw new InvalidRequestException("Complex type column deletes are not supported by table " + metadata); + + Cell<?> cell = (Cell<?>) columnMetadata; + + if (cell.isTombstone()) + applyColumnDelete(partitionKey, clusteringColumns, cell); + else + applyColumnUpdate(partitionKey, clusteringColumns, cell); + }); + else + applyRowDelete(partitionKey, clusteringColumns); + }); + else + { + // MutableDeletionInfo may have partition delete or range tombstone list or both + if (update.deletionInfo().hasRanges()) + update.deletionInfo() + .rangeIterator(false) + .forEachRemaining(rt -> applyRangeTombstone(partitionKey, rt.deletedSlice())); + + if (!update.deletionInfo().getPartitionDeletion().isLive()) + applyPartitionDelete(partitionKey); + } + } + + protected void applyPartitionDelete(DecoratedKey partitionKey) Review comment: Rather than the verb `Delete` I would use the noun `Deletion` in the apply method names. ########## File path: test/unit/org/apache/cassandra/cql3/validation/entities/VirtualTableTest.java ########## @@ -249,76 +344,188 @@ public void testQueriesOnTableWithMultiplePks() throws Throwable } @Test - public void testModifications() throws Throwable + public void testDMLOperationOnWritableTable() throws Throwable { // check for clean state - assertRows(execute("SELECT * FROM test_virtual_ks.vt2")); + assertEmpty(execute("SELECT * FROM test_virtual_ks.vt2")); // fill the table, test UNLOGGED batch execute("BEGIN UNLOGGED BATCH " + - "UPDATE test_virtual_ks.vt2 SET value = 1 WHERE key ='pk1';" + - "UPDATE test_virtual_ks.vt2 SET value = 2 WHERE key ='pk2';" + - "UPDATE test_virtual_ks.vt2 SET value = 3 WHERE key ='pk3';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 1, v2 = 1 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 2, v2 = 2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 3, v2 = 3 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 4, v2 = 4 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_3';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 5, v2 = 5 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_5';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 6, v2 = 6 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_6';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 7, v2 = 7 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_1' AND c2 = 'c2_1';" + + "UPDATE test_virtual_ks.vt2 SET v1 = 8, v2 = 8 WHERE pk1 = 'pk1_2' AND pk2 = 'pk2_2' AND c1 = 'c1_2' AND c2 = 'c2_1';" + "APPLY BATCH"); assertRows(execute("SELECT * FROM test_virtual_ks.vt2"), - row("pk1", 1), - row("pk2", 2), - row("pk3", 3)); + row("pk1_1", "pk2_1", "c1_1", "c2_1", 1, 1L), + row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 2L), + row("pk1_2", "pk2_1", "c1_1", "c2_1", 3, 3L), + row("pk1_2", "pk2_1", "c1_1", "c2_3", 4, 4L), + row("pk1_2", "pk2_1", "c1_1", "c2_5", 5, 5L), + row("pk1_2", "pk2_1", "c1_1", "c2_6", 6, 6L), + row("pk1_2", "pk2_2", "c1_1", "c2_1", 7, 7L), + row("pk1_2", "pk2_2", "c1_2", "c2_1", 8, 8L)); + + // update a single column with UPDATE + execute("UPDATE test_virtual_ks.vt2 SET v1 = 11 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"), + row("pk1_1", "pk2_1", "c1_1", "c2_1", 11, 1L)); + + // update multiple columns with UPDATE + execute("UPDATE test_virtual_ks.vt2 SET v1 = 111, v2 = 111 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_1'"), + row("pk1_1", "pk2_1", "c1_1", "c2_1", 111, 111L)); + + // update a single columns with INSERT + execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v2) VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 22)"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"), + row("pk1_1", "pk2_1", "c1_1", "c2_2", 2, 22L)); + + // update multiple columns with INSERT + execute("INSERT INTO test_virtual_ks.vt2 (pk1, pk2, c1, c2, v1, v2) VALUES ('pk1_1', 'pk2_1', 'c1_1', 'c2_2', 222, 222)"); + assertRows(execute("SELECT * FROM test_virtual_ks.vt2 WHERE pk1 = 'pk1_1' AND pk2 = 'pk2_1' AND c1 = 'c1_1' AND c2 = 'c2_2'"), + row("pk1_1", "pk2_1", "c1_1", "c2_2", 222, 222L)); + Review comment: We should also check for updates using `IN` operators ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows fine-grained source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) + row.forEach(columnMetadata -> + { + if (columnMetadata.column().isComplex()) + throw new InvalidRequestException("Complex type column deletes are not supported by table " + metadata); + + Cell<?> cell = (Cell<?>) columnMetadata; + + if (cell.isTombstone()) + applyColumnDelete(partitionKey, clusteringColumns, cell); + else + applyColumnUpdate(partitionKey, clusteringColumns, cell); + }); + else + applyRowDelete(partitionKey, clusteringColumns); + }); + else + { + // MutableDeletionInfo may have partition delete or range tombstone list or both + if (update.deletionInfo().hasRanges()) + update.deletionInfo() + .rangeIterator(false) + .forEachRemaining(rt -> applyRangeTombstone(partitionKey, rt.deletedSlice())); + + if (!update.deletionInfo().getPartitionDeletion().isLive()) + applyPartitionDelete(partitionKey); + } + } + + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + } + + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + throw new InvalidRequestException("Range deletion is not supported by table " + metadata); + } + + protected void applyRowDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns) + { + throw new InvalidRequestException("Row deletion is not supported by table " + metadata); + } + + protected void applyColumnDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell) + { + throw new InvalidRequestException("Column deletion is not supported by table " + metadata); + } + + protected abstract void applyColumnUpdate(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell); + + public static abstract class SimpleWritableVirtualTable extends AbstractWritableVirtualTable { + + protected SimpleWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + protected void applyPartitionDelete(DecoratedKey partitionKey) + { + applyPartitionDelete(extractPartitionKeyColumnValues(partitionKey)); + } + + protected void applyPartitionDelete(Object[] partitionKeyColumnValues) + { + throw new InvalidRequestException("Partition deletion is not supported by table " + metadata); + + } + + @Override + protected void applyRangeTombstone(DecoratedKey partitionKey, Slice slice) + { + ClusteringBound<?> startClusteringColumns = slice.start(); + Object[] startClusteringColumnValues = extractClusteringColumnValues(startClusteringColumns); + + ClusteringBound<?> endClusteringColumns = slice.end(); + Object[] endClusteringColumnValues = extractClusteringColumnValues(endClusteringColumns); + + // It is a prefix of clustering columns that have equal condition. For example, if there are two clustering + // columns c1 and c2, then it will have c1. In case of a single clustering column the prefix is empty. + int clusteringColumnsPrefixLength = Math.max(startClusteringColumnValues.length, endClusteringColumnValues.length) - 1; + Object[] clusteringColumnValuesPrefix = new Object[clusteringColumnsPrefixLength]; + System.arraycopy(startClusteringColumnValues, 0, clusteringColumnValuesPrefix, 0, clusteringColumnsPrefixLength); + + Object startClusteringColumnValue = startClusteringColumns.isBottom() + ? null : startClusteringColumnValues[startClusteringColumnValues.length - 1]; + boolean isStartClusteringColumnInclusive = startClusteringColumns.isInclusive(); + + Object endClusteringColumnValue = endClusteringColumns.isBottom() + ? null : endClusteringColumnValues[endClusteringColumnValues.length - 1]; + boolean isEndClusteringColumnInclusive = endClusteringColumns.isInclusive(); + + applyRangeTombstone(extractPartitionKeyColumnValues(partitionKey), + clusteringColumnValuesPrefix, + startClusteringColumnValue, + isStartClusteringColumnInclusive, + endClusteringColumnValue, + isEndClusteringColumnInclusive); + } + + /** + * This method is called for every range tombstone. + * + * @param partitionKeyColumnValues is non-empty + * @param clusteringColumnValuesPrefix is empty if there is a single clustering column + * @param startClusteringColumnValue is null if there is no "gt" or "gte" condition on the clustering column + * @param isStartClusteringColumnInclusive distinguishes "gt" and "gte" conditions + * @param endClusteringColumnValue is null if there is no "lt" or "lte" condition on the clustering column + * @param isEndClusteringColumnInclusive distinguishes "lt" and "lte" conditions + */ + protected void applyRangeTombstone(Object[] partitionKeyColumnValues, + Object[] clusteringColumnValuesPrefix, + @Nullable Object startClusteringColumnValue, + boolean isStartClusteringColumnInclusive, + @Nullable Object endClusteringColumnValue, + boolean isEndClusteringColumnInclusive) + { + throw new InvalidRequestException("Range deletion is not supported by table " + metadata); + } + + + @Override + protected void applyRowDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns) + { + applyRowDelete(extractPartitionKeyColumnValues(partitionKey), extractClusteringColumnValues(clusteringColumns)); + } + + protected void applyRowDelete(Object[] partitionKeyColumnValues, Object[] clusteringColumnValues) + { + throw new InvalidRequestException("Row deletion is not supported by table " + metadata); + } + + @Override + protected void applyColumnDelete(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell) + { + applyColumnDelete(extractPartitionKeyColumnValues(partitionKey), + extractClusteringColumnValues(clusteringColumns), + extractColumnName(cell)); + } + + protected void applyColumnDelete(Object[] partitionKeyColumnValues, Object[] clusteringColumnValues, String columnName) + { + throw new InvalidRequestException("Column deletion is not supported by table " + metadata); + } + + @Override + protected void applyColumnUpdate(DecoratedKey partitionKey, ClusteringPrefix<?> clusteringColumns, Cell<?> cell) + { + applyColumnUpdate(extractPartitionKeyColumnValues(partitionKey), + extractClusteringColumnValues(clusteringColumns), + extractColumnName(cell), + extractColumnValue(cell)); + } + + protected abstract void applyColumnUpdate(Object[] partitionKeyColumnValues, Object[] clusteringColumnValues, + String columnName, Object columnValue); + + protected <V> void filterStringKeySortedMap(SortedMap<String, V> clusteringColumnsMap, + @Nullable String startClusteringColumnValue, + boolean isStartClusteringColumnInclusive, + @Nullable String endClusteringColumnValue, + boolean isEndClusteringColumnInclusive) + { + V firstValuePair = startClusteringColumnValue != null + ? clusteringColumnsMap.get(startClusteringColumnValue) : null; + + if (startClusteringColumnValue != null && endClusteringColumnValue != null) + { + // remove values for startClusteringColumnValue <= c < endClusteringColumnValue range + clusteringColumnsMap.subMap(startClusteringColumnValue, endClusteringColumnValue).clear(); + } + else if (endClusteringColumnValue == null) + { + // remove values for c <= startClusteringColumnValue range + clusteringColumnsMap.tailMap(startClusteringColumnValue).clear(); + } + else if (startClusteringColumnValue == null) + { + // remove values for endClusteringColumnValue < c range + clusteringColumnsMap.headMap(endClusteringColumnValue).clear(); + } + + // tailMap and subMap are inclusive for start key and we explicitly put start value back if needed + if (firstValuePair != null && !isStartClusteringColumnInclusive) + clusteringColumnsMap.put(startClusteringColumnValue, firstValuePair); + + // headMap and subMap are exclusive for end key and we explicitly remove start value if needed + if (isEndClusteringColumnInclusive) + clusteringColumnsMap.remove(endClusteringColumnValue); + } + + private Object[] extractPartitionKeyColumnValues(DecoratedKey partitionKey) + { + if (metadata.partitionKeyType instanceof CompositeType) + { + ByteBuffer[] partitionKeyColumnBytes = ((CompositeType) metadata.partitionKeyType).split(partitionKey.getKey()); + Object[] partitionKeyColumnValues = new Object[partitionKeyColumnBytes.length]; + for (int i = 0; i < partitionKeyColumnValues.length; i++) + { + partitionKeyColumnValues[i] = metadata.partitionKeyColumns().get(i).type.compose(partitionKeyColumnBytes[i]); + } + return partitionKeyColumnValues; + + } + else + return new Object[]{metadata.partitionKeyType.compose(partitionKey.getKey())}; + } + + private Object[] extractClusteringColumnValues(ClusteringPrefix<?> clusteringColumns) + { + // clusteringColumns.size() may be less than metadata.clusteringColumns().size() since not all clustering + // columns have to be always specified + Object[] clusteringColumnValues = new Object[clusteringColumns.size()]; + for (int i = 0; i < clusteringColumnValues.length; i++) + { + clusteringColumnValues[i] = metadata.clusteringColumns().get(i).type.compose(clusteringColumns.bufferAt(i)); + } + return clusteringColumnValues; + } + + private String extractColumnName(Cell<?> cell) + { + return cell.column().name.toCQLString(); + } + + private Object extractColumnValue(Cell<?> cell) + { + return metadata.getColumn(cell.column().name).cellValueType().compose(cell.buffer()); Review comment: It can be replaced by `cell.column().cellValueType().compose(cell.buffer())` if I am not mistaken. ########## File path: src/java/org/apache/cassandra/db/virtual/AbstractWritableVirtualTable.java ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.SortedMap; +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.ComplexColumnData; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableMetadata; + +/** + * An abstract virtual table implementation that builds the resultset on demand and allows source modification. + */ +public abstract class AbstractWritableVirtualTable extends AbstractVirtualTable +{ + + protected AbstractWritableVirtualTable(TableMetadata metadata) + { + super(metadata); + } + + @Override + public void apply(PartitionUpdate update) + { + DecoratedKey partitionKey = update.partitionKey(); + + if (update.deletionInfo().isLive()) + update.forEach(row -> + { + Clustering<?> clusteringColumns = row.clustering(); + + if (row.deletion().isLive()) + row.forEach(columnMetadata -> + { + if (columnMetadata.column().isComplex()) Review comment: It is not necessarily a delete. It might also be an update. For now I do not think that we need to support it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

