http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/tools/JsonTransformer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/JsonTransformer.java index 364070e,0000000..3deed96 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tools/JsonTransformer.java +++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java @@@ -1,536 -1,0 +1,556 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.tools; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.ColumnData; +import org.apache.cassandra.db.rows.ComplexColumnData; +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; +import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.impl.Indenter; +import org.codehaus.jackson.util.DefaultPrettyPrinter; +import org.codehaus.jackson.util.DefaultPrettyPrinter.NopIndenter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class JsonTransformer +{ + + private static final Logger logger = LoggerFactory.getLogger(JsonTransformer.class); + + private static final JsonFactory jsonFactory = new JsonFactory(); + + private final JsonGenerator json; + + private final CompactIndenter objectIndenter = new CompactIndenter(); + + private final CompactIndenter arrayIndenter = new CompactIndenter(); + + private final CFMetaData metadata; + + private final ISSTableScanner currentScanner; + + private boolean rawTime = false; + + private long currentPosition = 0; + + private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, CFMetaData metadata) + { + this.json = json; + this.metadata = metadata; + this.currentScanner = currentScanner; + this.rawTime = rawTime; + + DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); + prettyPrinter.indentObjectsWith(objectIndenter); + prettyPrinter.indentArraysWith(arrayIndenter); + json.setPrettyPrinter(prettyPrinter); + } + + public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, CFMetaData metadata, OutputStream out) + throws IOException + { + try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8"))) + { + JsonTransformer transformer = new JsonTransformer(json, currentScanner, rawTime, metadata); + json.writeStartArray(); + partitions.forEach(transformer::serializePartition); + json.writeEndArray(); + } + } + + public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, CFMetaData metadata, OutputStream out) throws IOException + { + try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, "UTF-8"))) + { + JsonTransformer transformer = new JsonTransformer(json, currentScanner, rawTime, metadata); + json.writeStartArray(); + keys.forEach(transformer::serializePartitionKey); + json.writeEndArray(); + } + } + + private void updatePosition() + { + this.currentPosition = currentScanner.getCurrentPosition(); + } + + private void serializePartitionKey(DecoratedKey key) + { + AbstractType<?> keyValidator = metadata.getKeyValidator(); + objectIndenter.setCompact(true); + try + { + arrayIndenter.setCompact(true); + json.writeStartArray(); + if (keyValidator instanceof CompositeType) + { + // if a composite type, the partition has multiple keys. + CompositeType compositeType = (CompositeType) keyValidator; + ByteBuffer keyBytes = key.getKey().duplicate(); + // Skip static data if it exists. + if (keyBytes.remaining() >= 2) + { + int header = ByteBufferUtil.getShortLength(keyBytes, keyBytes.position()); + if ((header & 0xFFFF) == 0xFFFF) + { + ByteBufferUtil.readShortLength(keyBytes); + } + } + + int i = 0; + while (keyBytes.remaining() > 0 && i < compositeType.getComponents().size()) + { + AbstractType<?> colType = compositeType.getComponents().get(i); + + ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(keyBytes); + String colValue = colType.getString(value); + + json.writeString(colValue); + + byte b = keyBytes.get(); + if (b != 0) + { + break; + } + ++i; + } + } + else + { + // if not a composite type, assume a single column partition key. + assert metadata.partitionKeyColumns().size() == 1; + json.writeString(keyValidator.getString(key.getKey())); + } + json.writeEndArray(); + objectIndenter.setCompact(false); + arrayIndenter.setCompact(false); + } + catch (IOException e) + { + logger.error("Failure serializing partition key.", e); + } + } + + private void serializePartition(UnfilteredRowIterator partition) + { + String key = metadata.getKeyValidator().getString(partition.partitionKey().getKey()); + try + { + json.writeStartObject(); + + json.writeFieldName("partition"); + json.writeStartObject(); + json.writeFieldName("key"); + serializePartitionKey(partition.partitionKey()); + json.writeNumberField("position", this.currentScanner.getCurrentPosition()); + + if (!partition.partitionLevelDeletion().isLive()) + { + serializeDeletion(partition.partitionLevelDeletion()); + } + else + { + json.writeEndObject(); + json.writeFieldName("rows"); + json.writeStartArray(); + updatePosition(); + if (!partition.staticRow().isEmpty()) + { + serializeRow(partition.staticRow()); + } + Unfiltered unfiltered; + updatePosition(); + while (partition.hasNext()) + { + unfiltered = partition.next(); + if (unfiltered instanceof Row) + { + serializeRow((Row) unfiltered); + } + else if (unfiltered instanceof RangeTombstoneMarker) + { + serializeTombstone((RangeTombstoneMarker) unfiltered); + } + updatePosition(); + } + json.writeEndArray(); + } + + json.writeEndObject(); + } + catch (IOException e) + { + logger.error("Fatal error parsing partition: {}", key, e); + } + } + + private void serializeRow(Row row) + { + try + { + json.writeStartObject(); + String rowType = row.isStatic() ? "static_block" : "row"; + json.writeFieldName("type"); + json.writeString(rowType); + json.writeNumberField("position", this.currentPosition); + + // Only print clustering information for non-static rows. + if (!row.isStatic()) + { + serializeClustering(row.clustering()); + } + + LivenessInfo liveInfo = row.primaryKeyLivenessInfo(); + if (!liveInfo.isEmpty()) + { + objectIndenter.setCompact(false); + json.writeFieldName("liveness_info"); + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("tstamp"); + json.writeString(dateString(TimeUnit.MICROSECONDS, liveInfo.timestamp())); + if (liveInfo.isExpiring()) + { + json.writeNumberField("ttl", liveInfo.ttl()); + json.writeFieldName("expires_at"); + json.writeString(dateString(TimeUnit.SECONDS, liveInfo.localExpirationTime())); + json.writeFieldName("expired"); + json.writeBoolean(liveInfo.localExpirationTime() < (System.currentTimeMillis() / 1000)); + } + json.writeEndObject(); + objectIndenter.setCompact(false); + } + + // If this is a deletion, indicate that, otherwise write cells. + if (!row.deletion().isLive()) + { + serializeDeletion(row.deletion().time()); + } + json.writeFieldName("cells"); + json.writeStartArray(); + for (ColumnData cd : row) + { + serializeColumnData(cd, liveInfo); + } + json.writeEndArray(); + json.writeEndObject(); + } + catch (IOException e) + { + logger.error("Fatal error parsing row.", e); + } + } + + private void serializeTombstone(RangeTombstoneMarker tombstone) + { + try + { + json.writeStartObject(); + json.writeFieldName("type"); + + if (tombstone instanceof RangeTombstoneBoundMarker) + { + json.writeString("range_tombstone_bound"); + RangeTombstoneBoundMarker bm = (RangeTombstoneBoundMarker) tombstone; + serializeBound(bm.clustering(), bm.deletionTime()); + } + else + { + assert tombstone instanceof RangeTombstoneBoundaryMarker; + json.writeString("range_tombstone_boundary"); + RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker) tombstone; + serializeBound(bm.openBound(false), bm.openDeletionTime(false)); + serializeBound(bm.closeBound(false), bm.closeDeletionTime(false)); + } + json.writeEndObject(); + objectIndenter.setCompact(false); + } + catch (IOException e) + { + logger.error("Failure parsing tombstone.", e); + } + } + + private void serializeBound(RangeTombstone.Bound bound, DeletionTime deletionTime) throws IOException + { + json.writeFieldName(bound.isStart() ? "start" : "end"); + json.writeStartObject(); + json.writeFieldName("type"); + json.writeString(bound.isInclusive() ? "inclusive" : "exclusive"); + serializeClustering(bound.clustering()); + serializeDeletion(deletionTime); + json.writeEndObject(); + } + + private void serializeClustering(ClusteringPrefix clustering) throws IOException + { + if (clustering.size() > 0) + { + json.writeFieldName("clustering"); + objectIndenter.setCompact(true); + json.writeStartArray(); + arrayIndenter.setCompact(true); + List<ColumnDefinition> clusteringColumns = metadata.clusteringColumns(); + for (int i = 0; i < clusteringColumns.size(); i++) + { + ColumnDefinition column = clusteringColumns.get(i); + if (i >= clustering.size()) + { + json.writeString("*"); + } + else + { + json.writeString(column.cellValueType().getString(clustering.get(i))); + } + } + json.writeEndArray(); + objectIndenter.setCompact(false); + arrayIndenter.setCompact(false); + } + } + + private void serializeDeletion(DeletionTime deletion) throws IOException + { + json.writeFieldName("deletion_info"); + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("marked_deleted"); + json.writeString(dateString(TimeUnit.MICROSECONDS, deletion.markedForDeleteAt())); + json.writeFieldName("local_delete_time"); + json.writeString(dateString(TimeUnit.SECONDS, deletion.localDeletionTime())); + json.writeEndObject(); + objectIndenter.setCompact(false); + } + + private void serializeColumnData(ColumnData cd, LivenessInfo liveInfo) + { + if (cd.column().isSimple()) + { + serializeCell((Cell) cd, liveInfo); + } + else + { + ComplexColumnData complexData = (ComplexColumnData) cd; + if (!complexData.complexDeletion().isLive()) + { + try + { + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("name"); + AbstractType<?> type = cd.column().type; + json.writeString(cd.column().name.toCQLString()); + serializeDeletion(complexData.complexDeletion()); + objectIndenter.setCompact(true); + json.writeEndObject(); + objectIndenter.setCompact(false); + } + catch (IOException e) + { + logger.error("Failure parsing ColumnData.", e); + } + } + for (Cell cell : complexData){ + serializeCell(cell, liveInfo); + } + } + } + + private void serializeCell(Cell cell, LivenessInfo liveInfo) + { + try + { + json.writeStartObject(); + objectIndenter.setCompact(true); + json.writeFieldName("name"); + AbstractType<?> type = cell.column().type; + json.writeString(cell.column().name.toCQLString()); + + if (cell.path() != null && cell.path().size() > 0) + { + CollectionType ct = (CollectionType) type; + json.writeFieldName("path"); + arrayIndenter.setCompact(true); + json.writeStartArray(); + for (int i = 0; i < cell.path().size(); i++) + { + json.writeString(ct.nameComparator().getString(cell.path().get(i))); + } + json.writeEndArray(); + arrayIndenter.setCompact(false); + } + if (cell.isTombstone()) + { + json.writeFieldName("deletion_info"); + objectIndenter.setCompact(true); + json.writeStartObject(); + json.writeFieldName("local_delete_time"); + json.writeString(dateString(TimeUnit.SECONDS, cell.localDeletionTime())); + json.writeEndObject(); + objectIndenter.setCompact(false); + } + else + { + json.writeFieldName("value"); + json.writeString(cell.column().cellValueType().getString(cell.value())); + } + if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp()) + { + json.writeFieldName("tstamp"); + json.writeString(dateString(TimeUnit.MICROSECONDS, cell.timestamp())); + } + if (cell.isExpiring() && (liveInfo.isEmpty() || cell.ttl() != liveInfo.ttl())) + { + json.writeFieldName("ttl"); + json.writeNumber(cell.ttl()); + json.writeFieldName("expires_at"); + json.writeString(dateString(TimeUnit.SECONDS, cell.localDeletionTime())); + json.writeFieldName("expired"); + json.writeBoolean(!cell.isLive((int) (System.currentTimeMillis() / 1000))); + } + json.writeEndObject(); + objectIndenter.setCompact(false); + } + catch (IOException e) + { + logger.error("Failure parsing cell.", e); + } + } + + private String dateString(TimeUnit from, long time) + { + long secs = from.toSeconds(time); + long offset = Math.floorMod(from.toNanos(time), 1000_000_000L); // nanos per sec + return rawTime? Long.toString(time) : Instant.ofEpochSecond(secs, offset).toString(); + } + + /** + * A specialized {@link Indenter} that enables a 'compact' mode which puts all subsequent json values on the same + * line. This is manipulated via {@link CompactIndenter#setCompact(boolean)} + */ + private static final class CompactIndenter extends NopIndenter + { + + private static final int INDENT_LEVELS = 16; + private final char[] indents; + private final int charsPerLevel; + private final String eol; + private static final String space = " "; + + private boolean compact = false; + + CompactIndenter() + { + this(" ", System.lineSeparator()); + } + + CompactIndenter(String indent, String eol) + { + this.eol = eol; + + charsPerLevel = indent.length(); + + indents = new char[indent.length() * INDENT_LEVELS]; + int offset = 0; + for (int i = 0; i < INDENT_LEVELS; i++) + { + indent.getChars(0, indent.length(), indents, offset); + offset += indent.length(); + } + } + + @Override + public boolean isInline() + { + return false; + } + + /** + * Configures whether or not subsequent json values should be on the same line delimited by string or not. + * + * @param compact + * Whether or not to compact. + */ + public void setCompact(boolean compact) + { + this.compact = compact; + } + + @Override + public void writeIndentation(JsonGenerator jg, int level) + { + try + { + if (!compact) + { + jg.writeRaw(eol); + if (level > 0) + { // should we err on negative values (as there's some flaw?) + level *= charsPerLevel; + while (level > indents.length) + { // unlike to happen but just in case + jg.writeRaw(indents, 0, indents.length); + level -= indents.length; + } + jg.writeRaw(indents, 0, level); + } + } + else + { + jg.writeRaw(space); + } + } + catch (IOException e) + { + e.printStackTrace(); + System.exit(1); + } + } + } - } ++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/OverlapIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/OverlapIterator.java index 131a749,7c1544a..b346a62 --- a/src/java/org/apache/cassandra/utils/OverlapIterator.java +++ b/src/java/org/apache/cassandra/utils/OverlapIterator.java @@@ -1,3 -1,23 +1,23 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * - */ ++*/ package org.apache.cassandra.utils; import java.util.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/RMIServerSocketFactoryImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/SyncUtil.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/SyncUtil.java index b217e29,0d293aa..4c0d89d --- a/src/java/org/apache/cassandra/utils/SyncUtil.java +++ b/src/java/org/apache/cassandra/utils/SyncUtil.java @@@ -1,6 -1,30 +1,26 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.cassandra.utils; -import java.io.FileDescriptor; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.SyncFailedException; +import java.io.*; import java.lang.reflect.Field; import java.nio.MappedByteBuffer; import java.nio.channels.ClosedChannelException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/utils/concurrent/Ref.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java index afca512,0000000..238a58d mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java +++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java @@@ -1,85 -1,0 +1,105 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.cql3; + +import org.junit.Test; + +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; + +import static org.junit.Assert.assertEquals; + +public class IndexQueryPagingTest extends CQLTester +{ + /* + * Some simple tests to verify the behaviour of paging during + * 2i queries. We only use a single index type (CompositesIndexOnRegular) + * as the code we want to exercise here is in their abstract + * base class. + */ + + @Test + public void pagingOnRegularColumn() throws Throwable + { + createTable("CREATE TABLE %s (" + + " k1 int," + + " v1 int," + + "PRIMARY KEY (k1))"); + createIndex("CREATE INDEX ON %s(v1)"); + + int rowCount = 3; + for (int i=0; i<rowCount; i++) + execute("INSERT INTO %s (k1, v1) VALUES (?, ?)", i, 0); + + executePagingQuery("SELECT * FROM %s WHERE v1=0", rowCount); + } + + @Test + public void pagingOnRegularColumnWithPartitionRestriction() throws Throwable + { + createTable("CREATE TABLE %s (" + + " k1 int," + + " c1 int," + + " v1 int," + + "PRIMARY KEY (k1, c1))"); + createIndex("CREATE INDEX ON %s(v1)"); + + int partitions = 3; + int rowCount = 3; + for (int i=0; i<partitions; i++) + for (int j=0; j<rowCount; j++) + execute("INSERT INTO %s (k1, c1, v1) VALUES (?, ?, ?)", i, j, 0); + + executePagingQuery("SELECT * FROM %s WHERE k1=0 AND v1=0", rowCount); + } + + @Test + public void pagingOnRegularColumnWithClusteringRestrictions() throws Throwable + { + createTable("CREATE TABLE %s (" + + " k1 int," + + " c1 int," + + " v1 int," + + "PRIMARY KEY (k1, c1))"); + createIndex("CREATE INDEX ON %s(v1)"); + + int partitions = 3; + int rowCount = 3; + for (int i=0; i<partitions; i++) + for (int j=0; j<rowCount; j++) + execute("INSERT INTO %s (k1, c1, v1) VALUES (?, ?, ?)", i, j, 0); + + executePagingQuery("SELECT * FROM %s WHERE k1=0 AND c1>=0 AND c1<=3 AND v1=0", rowCount); + } + + private void executePagingQuery(String cql, int rowCount) + { + // Execute an index query which should return all rows, + // setting the fetch size < than the row count. Assert + // that all rows are returned, so we know that paging + // of the results was involved. + Session session = sessionNet(); + Statement stmt = new SimpleStatement(String.format(cql, KEYSPACE + '.' + currentTable())); + stmt.setFetchSize(rowCount - 1); + assertEquals(rowCount, session.execute(stmt).all().size()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index 9af6028,0000000..b5d8159 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@@ -1,198 -1,0 +1,218 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +public class SinglePartitionSliceCommandTest +{ + private static final Logger logger = LoggerFactory.getLogger(SinglePartitionSliceCommandTest.class); + + private static final String KEYSPACE = "ks"; + private static final String TABLE = "tbl"; + + private static CFMetaData cfm; + private static ColumnDefinition v; + private static ColumnDefinition s; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + cfm = CFMetaData.Builder.create(KEYSPACE, TABLE) + .addPartitionKey("k", UTF8Type.instance) + .addStaticColumn("s", UTF8Type.instance) + .addClusteringColumn("i", IntegerType.instance) + .addRegularColumn("v", UTF8Type.instance) + .build(); + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm); + cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + v = cfm.getColumnDefinition(new ColumnIdentifier("v", true)); + s = cfm.getColumnDefinition(new ColumnIdentifier("s", true)); + } + + @Before + public void truncate() + { + Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).truncateBlocking(); + } + + @Test + public void staticColumnsAreFiltered() throws IOException + { + DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k")); + + UntypedResultSet rows; + + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s, i, v) VALUES ('k', 's', 0, 'v')"); + QueryProcessor.executeInternal("DELETE v FROM ks.tbl WHERE k='k' AND i=0"); + QueryProcessor.executeInternal("DELETE FROM ks.tbl WHERE k='k' AND i=0"); + rows = QueryProcessor.executeInternal("SELECT * FROM ks.tbl WHERE k='k' AND i=0"); + + for (UntypedResultSet.Row row: rows) + { + logger.debug("Current: k={}, s={}, v={}", (row.has("k") ? row.getString("k") : null), (row.has("s") ? row.getString("s") : null), (row.has("v") ? row.getString("v") : null)); + } + + assert rows.isEmpty(); + + ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v)); + ByteBuffer zero = ByteBufferUtil.bytes(0); + Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero))); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false); + ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); + + DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21)); + ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21); + DataInputPlus in = new DataInputBuffer(out.buffer(), true); + cmd = ReadCommand.legacyReadCommandSerializer.deserialize(in, MessagingService.VERSION_21); + + logger.debug("ReadCommand: {}", cmd); + UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup()); + ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd); + + logger.debug("creating response: {}", response); + partitionIterator = response.makeIterator(cmd); + assert partitionIterator.hasNext(); + UnfilteredRowIterator partition = partitionIterator.next(); + + LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition); + Assert.assertEquals(Collections.emptyList(), rowIter.cells); + } + + private void checkForS(UnfilteredPartitionIterator pi) + { + Assert.assertTrue(pi.toString(), pi.hasNext()); + UnfilteredRowIterator ri = pi.next(); + Assert.assertTrue(ri.columns().contains(s)); + Row staticRow = ri.staticRow(); + Iterator<Cell> cellIterator = staticRow.cells().iterator(); + Assert.assertTrue(staticRow.toString(cfm, true), cellIterator.hasNext()); + Cell cell = cellIterator.next(); + Assert.assertEquals(s, cell.column()); + Assert.assertEquals(ByteBufferUtil.bytesToHex(cell.value()), ByteBufferUtil.bytes("s"), cell.value()); + Assert.assertFalse(cellIterator.hasNext()); + } + + @Test + public void staticColumnsAreReturned() throws IOException + { + DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1")); + + QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k1', 's')"); + Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k1'").isEmpty()); + + ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false); + ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); + + // check raw iterator for static cell + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) + { + checkForS(pi); + } + + ReadResponse response; + DataOutputBuffer out; + DataInputPlus in; + ReadResponse dst; + + // check (de)serialized iterator for memtable static cell + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) + { + response = ReadResponse.createDataResponse(pi, cmd); + } + + out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); + ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); + in = new DataInputBuffer(out.buffer(), true); + dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); + try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd)) + { + checkForS(pi); + } + + // check (de)serialized iterator for sstable static cell + Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush(); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) + { + response = ReadResponse.createDataResponse(pi, cmd); + } + out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); + ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); + in = new DataInputBuffer(out.buffer(), true); + dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); + try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd)) + { + checkForS(pi); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java index b5c2f41,0000000..6a4aace mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java @@@ -1,100 -1,0 +1,120 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.commitlog; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.Semaphore; + +import javax.naming.ConfigurationException; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.schema.KeyspaceParams; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.google.common.collect.ImmutableMap; + +@RunWith(BMUnitRunner.class) +public class CommitLogSegmentManagerTest +{ + //Block commit log service from syncing + private static final Semaphore allowSync = new Semaphore(0); + + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String STANDARD1 = "Standard1"; + private static final String STANDARD2 = "Standard2"; + + private final static byte[] entropy = new byte[1024 * 256]; + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + new Random().nextBytes(entropy); + DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of())); + DatabaseDescriptor.setCommitLogSegmentSize(1); + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); + + CompactionManager.instance.disableAutoCompaction(); + } + + @Test + @BMRule(name = "Block AbstractCommitLogSegment segment flushing", + targetClass = "AbstractCommitLogService$1", + targetMethod = "run", + targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()") + public void testCompressedCommitLogBackpressure() throws Throwable + { + CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); + + Thread dummyThread = new Thread( () -> + { + for (int i = 0; i < 20; i++) + CommitLog.instance.add(m); + }); + dummyThread.start(); + + CommitLogSegmentManager clsm = CommitLog.instance.allocator; + + //Protect against delay, but still break out as fast as possible + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 5000) + { + if (clsm.getActiveSegments().size() >= 3) + break; + } + Thread.sleep(1000); + + //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes + Assert.assertEquals(3, clsm.getActiveSegments().size()); + + clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment)); + + Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + } - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java index 98ad2bc,0000000..400d65a mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java +++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java @@@ -1,428 -1,0 +1,448 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.db.Slice.Bound; +import org.apache.cassandra.db.ClusteringPrefix; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; +import static org.junit.Assert.*; + +public class RowAndDeletionMergeIteratorTest +{ + private static final String KEYSPACE1 = "RowTest"; + private static final String CF_STANDARD1 = "Standard1"; + + private int nowInSeconds; + private DecoratedKey dk; + private ColumnFamilyStore cfs; + private CFMetaData cfm; + private ColumnDefinition defA; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + CFMetaData cfMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1) + .addPartitionKey("key", AsciiType.instance) + .addClusteringColumn("col1", Int32Type.instance) + .addRegularColumn("a", Int32Type.instance) + .build(); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + cfMetadata); + + } + + @Before + public void setup() + { + nowInSeconds = FBUtilities.nowInSeconds(); + dk = Util.dk("key0"); + cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); + cfm = cfs.metadata; + defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true)); + } + + @Test + public void testWithNoRangeTombstones() + { + Iterator<Row> rowIterator = createRowIterator(); + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, Collections.emptyIterator(), false); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 0); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 1); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 2); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 3); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 4); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testWithOnlyRangeTombstones() + { + int delTime = nowInSeconds + 1; + long timestamp = toMillis(delTime); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(rt(1, false, 3, false, timestamp, delTime), + atLeast(4, timestamp, delTime)); + UnfilteredRowIterator iterator = createMergeIterator(Collections.emptyIterator(), rangeTombstoneIterator, false); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 1); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_BOUND, 3); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_START_BOUND, 4); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.TOP); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testWithAtMostRangeTombstone() + { + Iterator<Row> rowIterator = createRowIterator(); + + int delTime = nowInSeconds + 1; + long timestamp = toMillis(delTime); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime)); + + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.BOTTOM); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 1); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 2); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 3); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 4); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testWithGreaterThanRangeTombstone() + { + Iterator<Row> rowIterator = createRowIterator(); + + int delTime = nowInSeconds + 1; + long timestamp = toMillis(delTime); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(greaterThan(2, timestamp, delTime)); + + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 0); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 1); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 2); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.TOP); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testWithAtMostAndGreaterThanRangeTombstone() + { + Iterator<Row> rowIterator = createRowIterator(); + + int delTime = nowInSeconds + 1; + long timestamp = toMillis(delTime); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime), + greaterThan(2, timestamp, delTime)); + + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.BOTTOM); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 1); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 2); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.TOP); + + assertFalse(iterator.hasNext()); + } + + private void assertRtMarker(Unfiltered unfiltered, ClusteringPrefix.Kind kind, int col1) + { + assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind()); + assertEquals(kind, unfiltered.clustering().kind()); + assertEquals(bb(col1), unfiltered.clustering().get(0)); + } + + @Test + public void testWithIncludingEndExcludingStartMarker() + { + Iterator<Row> rowIterator = createRowIterator(); + + int delTime = nowInSeconds + 1; + long timestamp = toMillis(delTime); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(2, timestamp, delTime), + greaterThan(2, timestamp, delTime)); + + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.BOTTOM); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 2); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.TOP); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testWithExcludingEndIncludingStartMarker() + { + Iterator<Row> rowIterator = createRowIterator(); + + int delTime = nowInSeconds + 1; + long timestamp = toMillis(delTime); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(lessThan(2, timestamp, delTime), + atLeast(2, timestamp, delTime)); + + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.BOTTOM); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 2); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.TOP); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testNonShadowingTombstone() + { + Iterator<Row> rowIterator = createRowIterator(); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, -1L, 0)); + + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), Bound.BOTTOM); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 0); + + assertTrue(iterator.hasNext()); + assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 1); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 2); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 3); + + assertTrue(iterator.hasNext()); + assertRow(iterator.next(), 4); + + assertFalse(iterator.hasNext()); + } + + @Test + public void testWithPartitionLevelTombstone() + { + Iterator<Row> rowIterator = createRowIterator(); + + int delTime = nowInSeconds - 1; + long timestamp = toMillis(delTime); + + Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime), + greaterThan(2, timestamp, delTime)); + + int partitionDelTime = nowInSeconds + 1; + long partitionTimestamp = toMillis(partitionDelTime); + + UnfilteredRowIterator iterator = createMergeIterator(rowIterator, + rangeTombstoneIterator, + new DeletionTime(partitionTimestamp, partitionDelTime), + false); + + assertFalse(iterator.hasNext()); + } + + + private void assertRtMarker(Unfiltered unfiltered, Bound bound) + { + assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind()); + assertEquals(bound, unfiltered.clustering()); + } + + private void assertRow(Unfiltered unfiltered, int col1) + { + assertEquals(Unfiltered.Kind.ROW, unfiltered.kind()); + assertEquals(cfm.comparator.make(col1), unfiltered.clustering()); + } + + private Iterator<RangeTombstone> createRangeTombstoneIterator(RangeTombstone... tombstones) + { + RangeTombstoneList list = new RangeTombstoneList(cfm.comparator, 10); + + for (RangeTombstone tombstone : tombstones) + list.add(tombstone); + + return list.iterator(Slice.ALL, false); + } + + private Iterator<Row> createRowIterator() + { + PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1); + for (int i = 0; i < 5; i++) + addRow(update, i, i); + + return update.iterator(); + } + + private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows, Iterator<RangeTombstone> tombstones, boolean reversed) + { + return createMergeIterator(rows, tombstones, DeletionTime.LIVE, reversed); + } + + private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows, + Iterator<RangeTombstone> tombstones, + DeletionTime deletionTime, + boolean reversed) + { + return new RowAndDeletionMergeIterator(cfm, + Util.dk("k"), + deletionTime, + ColumnFilter.all(cfm), + Rows.EMPTY_STATIC_ROW, + reversed, + EncodingStats.NO_STATS, + rows, + tombstones, + true); + } + + private void addRow(PartitionUpdate update, int col1, int a) + { + update.add(BTreeRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0))); + } + + private Cell makeCell(CFMetaData cfm, ColumnDefinition columnDefinition, int value, long timestamp) + { + return BufferCell.live(cfm, columnDefinition, timestamp, ((AbstractType)columnDefinition.cellValueType()).decompose(value)); + } + + private static RangeTombstone atLeast(int start, long tstamp, int delTime) + { + return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime)); + } + + private static RangeTombstone atMost(int end, long tstamp, int delTime) + { + return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime)); + } + + private static RangeTombstone lessThan(int end, long tstamp, int delTime) + { + return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.exclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime)); + } + + private static RangeTombstone greaterThan(int start, long tstamp, int delTime) + { + return new RangeTombstone(Slice.make(Slice.Bound.exclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime)); + } + + private static RangeTombstone rt(int start, boolean startInclusive, int end, boolean endInclusive, long tstamp, int delTime) + { + Slice.Bound startBound = startInclusive ? Slice.Bound.inclusiveStartOf(bb(start)) : Slice.Bound.exclusiveStartOf(bb(start)); + Slice.Bound endBound = endInclusive ? Slice.Bound.inclusiveEndOf(bb(end)) : Slice.Bound.exclusiveEndOf(bb(end)); + + return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(tstamp, delTime)); + } + + private static ByteBuffer bb(int i) + { + return ByteBufferUtil.bytes(i); + } + + private long toMillis(int timeInSeconds) + { + return timeInSeconds * 1000L; + } +}