blambov commented on code in PR #4402:
URL: https://github.com/apache/cassandra/pull/4402#discussion_r2487015327


##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -1085,7 +1085,20 @@ private void maybeSwitchWriter(CompactionAwareWriter 
writer)
     }
 
     // SORT AND COMPARE
-    private boolean sortForPartitionMerge() throws IOException
+
+    /**
+     * Sorts the cursors array in preparation for partition merge. This 
assumes cursors are in one of 3 states:
+     * <ul>
+     *     <li>PARTITION_START - Partition header is loaded in preparation for 
merge</li>
+     *     <li>begining of unfiltered/end of partition - header is loaded, 
list is sorted after this point</li>

Review Comment:
   Could you explain why? Something in the sense of "If the cursor was used in 
the processing of the previous partition, its state would have advanced to 
PARTITION_START or DONE. Otherwise, it would remain positioned after the 
partition header, in one of these states."



##########
src/java/org/apache/cassandra/config/Config.java:
##########
@@ -644,6 +645,8 @@ public static class SSTableConfig
     @Replaces(oldName = "enable_drop_compact_storage", converter = 
Converters.IDENTITY, deprecated = true)
     public volatile boolean drop_compact_storage_enabled = false;
 
+    public boolean enable_cursor_compaction = 
ENABLE_CURSOR_COMPACTION.getBoolean();

Review Comment:
   It looks like the `enable_X` style of naming is deprecated. Could you rename 
this (as well as the getter and system property) to the preferred 
`cursor_compaction_enabled`?



##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -112,6 +112,50 @@
  */
 public class CompactionCursor extends CompactionInfo.Holder
 {
+    public static boolean isSupported(AbstractCompactionStrategy.ScannerList 
scanners, AbstractCompactionController controller)
+    {
+        TableMetadata metadata = controller.cfs.metadata();
+        if (metadata.getTableDirectoryName().contains("system") ||

Review Comment:
   Use `SchemaConstants.isSystemKeyspace(table.keyspace)` or some of the other 
variations in `SchemaConstants` for this check.
   
   What makes the check necessary? Why isn't the partitioner class check 
sufficient?



##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -0,0 +1,1573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.LongPredicate;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.UnmodifiableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.ElementDescriptor;
+import org.apache.cassandra.io.sstable.PartitionDescriptor;
+import org.apache.cassandra.io.sstable.ReusableLivenessInfo;
+import org.apache.cassandra.io.sstable.SSTableCursorReader;
+import org.apache.cassandra.io.sstable.SSTableCursorWriter;
+import org.apache.cassandra.io.util.ReusableDecoratedKey;
+import org.apache.cassandra.io.util.ReusableLongToken;
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Cells;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.COMPARE;
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.LEFT;
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.RIGHT;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_HEADER_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_VALUE_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.DONE;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.ELEMENT_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.ROW_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.STATIC_ROW_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.TOMBSTONE_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.isState;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
+import static 
org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_START_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_BOUND;
+import static 
org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_START_BOUND;
+
+/**
+ * Merge multiple iterators over the content of sstable into a "compacted" 
iterator.
+ * <p>
+ * On top of the actual merging the source iterators, this class:
+ * <ul>
+ *   <li>purge gc-able tombstones if possible (see PurgeFunction below).</li>
+ *   <li>invalidate cached partitions that are empty post-compaction. This 
avoids keeping partitions with
+ *       only purgable tombstones in the row cache.</li>
+ *   <li>keep tracks of the compaction progress.</li>
+ *   <li>TODO:update 2ndary indexes if necessary (as we don't 
read-before-write on index updates, index entries are
+ *       not deleted on deletion of the base table data, which is ok because 
we'll fix index inconsistency
+ *       on reads. This however mean that potentially obsolete index entries 
could be kept a long time for
+ *       data that is not read often, so compaction "pro-actively" fix such 
index entries. This is mainly
+ *       an optimization).</li>
+ * </ul>
+ */

Review Comment:
   Well the comment isn't quite right. This is not creating a compacted 
iterator, an iterator of any kind, or even a cursor of any kind.
   
   The class pulls data from multiple cursors and pushes it into a given 
writer. This is fundamentally different from the way compaction works in other 
code. Please explain the difference, because it is critical to understanding 
the class and being able to work on it.



##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -112,6 +112,50 @@
  */
 public class CompactionCursor extends CompactionInfo.Holder
 {
+    public static boolean isSupported(AbstractCompactionStrategy.ScannerList 
scanners, AbstractCompactionController controller)
+    {
+        TableMetadata metadata = controller.cfs.metadata();
+        if (metadata.getTableDirectoryName().contains("system") ||
+            !(metadata.partitioner instanceof Murmur3Partitioner) ||
+            metadata.indexes.size() != 0)
+        {
+            return false;
+        }
+
+        for (ColumnMetadata column : metadata.columns())
+        {
+            if (column.isComplex())
+            {
+                return false;
+            }
+            else if (column.isCounterColumn())
+            {
+                return false;
+            }
+        }
+
+        for (ISSTableScanner scanner : scanners.scanners)
+        {
+            // TODO: implement partial range reader
+            if (!scanner.isFullRange())
+                return false;
+
+            for (SSTableReader reader : scanner.getBackingSSTables()) {
+                Version version = reader.descriptor.version;
+                if (!(version.format instanceof BigFormat))

Review Comment:
   The input sstable format and the output format are not the same thing (we 
can be in the middle of an upgrade). This cursor's restriction is that it can't 
write the bti format, which we can't check by going through the source sstables 
-- we need to use `DatabaseDescriptor.getSelectedSSTableFormat()` instead.



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionPipeline.java:
##########
@@ -31,14 +31,12 @@
 import java.util.Set;
 
 abstract class AbstractCompactionPipeline extends CompactionInfo.Holder 
implements AutoCloseable {
-    static AbstractCompactionPipeline create(CompactionTask task, 
OperationType type, AbstractCompactionStrategy.ScannerList scanners, 
AbstractCompactionController controller, long nowInSec, TimeUUID compactionId) {
+    static AbstractCompactionPipeline create(CompactionTask task, 
OperationType type, AbstractCompactionStrategy.ScannerList scanners, 
AbstractCompactionController controller, long nowInSec, TimeUUID compactionId)
+    {
         if (DatabaseDescriptor.enableCursorCompaction()) {
-            try {
+            if (CompactionCursor.isSupported(scanners, controller))

Review Comment:
   We should still log whether we are doing cursor compaction.



##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -1085,7 +1085,20 @@ private void maybeSwitchWriter(CompactionAwareWriter 
writer)
     }
 
     // SORT AND COMPARE
-    private boolean sortForPartitionMerge() throws IOException
+
+    /**
+     * Sorts the cursors array in preparation for partition merge. This 
assumes cursors are in one of 3 states:
+     * <ul>
+     *     <li>PARTITION_START - Partition header is loaded in preparation for 
merge</li>

Review Comment:
   Shouldn't this be "needs to be loaded"?



##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -1085,7 +1085,20 @@ private void maybeSwitchWriter(CompactionAwareWriter 
writer)
     }
 
     // SORT AND COMPARE
-    private boolean sortForPartitionMerge() throws IOException
+
+    /**
+     * Sorts the cursors array in preparation for partition merge. This 
assumes cursors are in one of 3 states:
+     * <ul>
+     *     <li>PARTITION_START - Partition header is loaded in preparation for 
merge</li>
+     *     <li>begining of unfiltered/end of partition - header is loaded, 
list is sorted after this point</li>
+     *     <li>DONE - need to be reset</li>

Review Comment:
   It is not clear at all why the cursor "need to be reset". Also, we will keep 
resetting finished cursors again and again if e.g. only one scanner remains.
   
   Could you explain both points?
   
   Or maybe add an intermediate state that this method advances to DONE? 
Alternatively, only recognize the end of the file in the PARTITION_START 
processing, which we may need to do anyway to handle the state where a file is 
immediately exhausted (which is more likely to happen for partial scanners)?



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -287,16 +294,28 @@ public boolean apply(SSTableReader sstable)
                     timeSpentWritingKeys = 
TimeUnit.NANOSECONDS.toMillis(nanoTime() - start);
 
                     // point of no return
-                    newSStables = writer.finish();
+                    newSStables = finish(ci);
+                }
+                catch (Exception e)
+                {
+                    if (e instanceof IOException)
+                        throw (IOException) e;
+                    else if (e instanceof CompactionInterruptedException)
+                        throw (CompactionInterruptedException) e;
+                    else
+                        throw new IllegalStateException(e);

Review Comment:
   I'd prefer not to change the existing behaviour (which wraps these into 
`RuntimeException` instead of `IllegalStateException`), as I don't know what 
may be relying on this wrapping (including customer tools).



##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -0,0 +1,1573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.LongPredicate;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.UnmodifiableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.ElementDescriptor;
+import org.apache.cassandra.io.sstable.PartitionDescriptor;
+import org.apache.cassandra.io.sstable.ReusableLivenessInfo;
+import org.apache.cassandra.io.sstable.SSTableCursorReader;
+import org.apache.cassandra.io.sstable.SSTableCursorWriter;
+import org.apache.cassandra.io.util.ReusableDecoratedKey;
+import org.apache.cassandra.io.util.ReusableLongToken;
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Cells;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.COMPARE;
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.LEFT;
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.RIGHT;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_HEADER_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_VALUE_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.DONE;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.ELEMENT_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.ROW_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.STATIC_ROW_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.TOMBSTONE_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.isState;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
+import static 
org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_START_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_BOUND;
+import static 
org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_START_BOUND;
+
+/**
+ * Merge multiple iterators over the content of sstable into a "compacted" 
iterator.
+ * <p>
+ * On top of the actual merging the source iterators, this class:
+ * <ul>
+ *   <li>purge gc-able tombstones if possible (see PurgeFunction below).</li>
+ *   <li>invalidate cached partitions that are empty post-compaction. This 
avoids keeping partitions with
+ *       only purgable tombstones in the row cache.</li>
+ *   <li>keep tracks of the compaction progress.</li>
+ *   <li>TODO:update 2ndary indexes if necessary (as we don't 
read-before-write on index updates, index entries are
+ *       not deleted on deletion of the base table data, which is ok because 
we'll fix index inconsistency
+ *       on reads. This however mean that potentially obsolete index entries 
could be kept a long time for
+ *       data that is not read often, so compaction "pro-actively" fix such 
index entries. This is mainly
+ *       an optimization).</li>
+ * </ul>
+ */
+public class CompactionCursor extends CompactionInfo.Holder
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompactionCursor.class.getName());
+
+    private final OperationType type;
+    private final AbstractCompactionController controller;
+    private final ActiveCompactionsTracker activeCompactions;
+    private final ImmutableSet<SSTableReader> sstables;
+    private final long nowInSec;
+    private final TimeUUID compactionId;
+    private final long totalInputBytes;
+    private final StatefulCursor[] sstableCursors;
+    private final boolean[] sstableCursorsEqualsNext;
+    private final boolean hasStaticColumns;
+    private final boolean enforceStrictLiveness;
+
+    // Keep targetDirectory for compactions, needed for `nodetool 
compactionstats`
+    private volatile String targetDirectory;
+
+    private SSTableCursorWriter ssTableCursorWriter;
+    private boolean finished = false;
+
+    /*
+     * counters for merged partitions/rows/cells.
+     * array index represents (number of merged rows - 1), so index 0 is 
counter for no merge (1 row),
+     * index 1 is counter for 2 rows merged, and so on.
+     */
+    private final long[] partitionMergeCounters;
+    private final long[] staticRowMergeCounters;
+    private final long[] rowMergeCounters;
+    private final long[] rangeTombstonesMergeCounters;
+    private final long[] cellMergeCounters;
+
+    // Progress accounting
+    private long totalBytesRead = 0;
+    private long totalSourceCQLRows;
+    private long totalDataBytesWritten;
+
+    // state
+    final Purger purger;
+
+    private ReusableDecoratedKey prevKey = null;
+    // Partition state. Writes can be delayed if the deletion is purged, or 
live and partition is empty -> LIVE deletion.
+    ReusableDecoratedKey partitionKey;
+    PartitionDescriptor partitionDescriptor;
+    DeletionTime partitionDeletion;
+    // This will be 0 if we haven't written partition header.
+    int partitionHeaderLength = 0;
+    private CompactionAwareWriter compactionAwareWriter;
+
+    public CompactionCursor(OperationType type, List<ISSTableScanner> 
scanners, AbstractCompactionController controller, long nowInSec, TimeUUID 
compactionId)
+    {
+        this(type, scanners, controller, nowInSec, compactionId, 
ActiveCompactionsTracker.NOOP, null);
+    }
+
+    public CompactionCursor(OperationType type,
+                            List<ISSTableScanner> scanners,
+                            AbstractCompactionController controller,
+                            long nowInSec,
+                            TimeUUID compactionId,
+                            ActiveCompactionsTracker activeCompactions,
+                            TopPartitionTracker.Collector 
topPartitionCollector)
+    {
+        this.controller = controller;
+        this.type = type;
+        this.nowInSec = nowInSec;
+        this.compactionId = compactionId;
+
+        long inputBytes = 0;
+        for (ISSTableScanner scanner : scanners)
+            inputBytes += scanner.getLengthInBytes();
+        this.totalInputBytes = inputBytes;
+        this.partitionMergeCounters = new long[scanners.size()];
+        this.staticRowMergeCounters = new long[partitionMergeCounters.length];
+        this.rowMergeCounters = new long[partitionMergeCounters.length];
+        this.rangeTombstonesMergeCounters = new 
long[partitionMergeCounters.length];
+        this.cellMergeCounters = new long[partitionMergeCounters.length];
+        // note that we leak `this` from the constructor when calling 
beginCompaction below, this means we have to get the sstables before
+        // calling that to avoid a NPE.
+        this.sstables = 
scanners.stream().map(ISSTableScanner::getBackingSSTables).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
+        this.activeCompactions = activeCompactions == null ? 
ActiveCompactionsTracker.NOOP : activeCompactions;
+        this.activeCompactions.beginCompaction(this); // note that 
CompactionTask also calls this, but CT only creates CompactionIterator with a 
NOOP ActiveCompactions
+
+        TableMetadata metadata = metadata();
+        if (!(metadata.partitioner instanceof Murmur3Partitioner))
+            throw new IllegalArgumentException("SSTableReader is not a murmur3 
partitioner:" + metadata.partitioner.getClass().getCanonicalName() +" cursor 
compactions are only supported for Murmur3Partitioner.");
+
+        if (metadata.indexes.size() != 0)
+            throw new IllegalArgumentException("SAI is not supported for 
cursor compactions: " + metadata.indexes +".");
+
+        for (ColumnMetadata column : metadata.columns()) {
+            if (column.isComplex()) {
+                throw new IllegalArgumentException("Complex column: " + column 
+ " cursor compactions are not supported for complex types.");
+            }
+            else if (column.isCounterColumn()) {
+                throw new IllegalArgumentException("Counter column: " + column 
+ " cursor compactions are not supported for counter types.");
+            }
+        }
+
+        this.hasStaticColumns = metadata.hasStaticColumns();
+        /**
+         * Pipeline should end up being:
+         * [MERGED -> ?TopPartitionTracker -> GarbageSkipper -> Purger -> 
DuplicateRowChecker -> Abortable] -> next()
+         * V - Merge - This is drawing on code all over the place to iterate 
through the data and merge partitions/rows/cells
+         * * Transactions, applied to above iterator:
+         *   X - TODO: We can leave for now? - {@link 
TopPartitionTracker.TombstoneCounter} - Hooked into CFS metadata, tracks 
tombstone counts per pk.
+         *   X - TODO: We can leave for now? - {@link 
CompactionIterator.GarbageSkipper} - filters out, or "skips" data shadowed by 
the provided "tombstone source".
+         * V  * {@link CompactionIterator.Purger} - filters out, or "purges" 
gc-able tombstones. Also updates bytes read on every row % 100.
+         *   X - TODO: We can leave for now? - {@link DuplicateRowChecker} - 
reports duplicate rows across replicas.
+         *   X - TODO: We can leave for now? - Abortable - aborts the 
compaction if the user has requested it (at a certain granularity).
+         * {@link CompactionIterator#CompactionIterator(OperationType, List, 
AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker, 
TopPartitionTracker.Collector)}
+         */
+
+        // validation
+        // TODO: these can be views on range limited readers, not necessarily 
full readers.
+        for (ISSTableScanner scanner : scanners)
+        {
+            if (!scanner.isFullRange()) throw new 
IllegalArgumentException("Scanner is not full range:" + scanner  + " cursor 
compactions are not supported");
+        }
+
+        // TODO: Implement CompactionIterator.GarbageSkipper like functionality
+        if (controller.tombstoneOption != 
CompactionParams.TombstoneOption.NONE)
+            throw new IllegalArgumentException("tombstoneOption: " + 
controller.tombstoneOption  + " cursor compactions are not supported");
+
+        // Convert Readers to Cursors
+        this.sstableCursors = new StatefulCursor[sstables.size()];
+        this.sstableCursorsEqualsNext = new boolean[sstables.size()];
+        UnmodifiableIterator<SSTableReader> iterator = sstables.iterator();
+        for (int i = 0; i < this.sstableCursors.length; i++)
+        {
+            SSTableReader ssTableReader = iterator.next();
+            if (ssTableReader.getFilename().contains("system"))
+                throw new IllegalArgumentException("Cursor compactions on 
system tables are not supported.");
+            Version version = ssTableReader.descriptor.version;
+            if (!(version.format instanceof BigFormat))
+                throw new IllegalArgumentException("Cursor compactions only 
supported on BIG format: " + version);
+            if (!version.isLatestVersion())
+                throw new IllegalArgumentException("Cursor compactions only 
supported on latest version: " + version);
+
+            this.sstableCursors[i] = new StatefulCursor(ssTableReader);
+        }
+        this.enforceStrictLiveness = 
controller.cfs.metadata.get().enforceStrictLiveness();
+
+        purger = new Purger(type, controller, nowInSec);
+    }
+
+    /**
+     * @return false if finished, true if partition is written (which might 
require multiple partition reads)
+     */
+    public boolean writeNextPartition(CompactionAwareWriter 
compactionAwareWriter) throws IOException {
+        while (!finished) {
+            if (tryWriteNextPartition(compactionAwareWriter)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @return true if a partition was written
+     */
+    private boolean tryWriteNextPartition(CompactionAwareWriter 
compactionAwareWriter) throws IOException
+    {
+        if (sortForPartitionMerge())
+        {
+            finish();
+            return false;
+        }
+
+        // Top reader is on the current key/header
+        partitionDescriptor = sstableCursors[0].pHeader;
+        partitionKey = sstableCursors[0].currentDecoratedKey;
+
+        // possibly reached boundary of the current writer
+        try
+        {
+            // TODO: Potentially redundant validation... Can be done on the 
writer level?
+            if (prevKey != null && prevKey.compareTo(partitionKey) >= 0)
+                throw new RuntimeException(String.format("Last written key %s 
>= current key %s", prevKey, partitionKey));
+            // NOTE: We now have prevKey == partitionKey, and 
sstableCursors[0].currentDecoratedKey == prevKey. Which is confusing in a 
debugger.
+            retainPrevKeyForValidation();
+
+            int partitionMergeLimit = findPartitionMergeLimit();
+            // needed if we actually write a partition, not used otherwise
+            this.compactionAwareWriter = compactionAwareWriter;
+
+            purger.resetOnNewPartition(partitionKey);
+            boolean written = mergePartitions(partitionMergeLimit);
+            if (!written)
+                purger.onEmptyPartitionPostPurge();
+            return written;
+        }
+        finally
+        {
+            partitionKey = null;
+            partitionDescriptor = null;
+            partitionHeaderLength = 0;
+        }
+    }
+
+
+    /**
+     * See {@link UnfilteredPartitionIterators#merge(List, 
UnfilteredPartitionIterators.MergeListener)}
+     */
+    private boolean mergePartitions(int partitionMergeLimit) throws IOException
+    {
+        partitionMergeCounters[partitionMergeLimit - 1]++;
+        // p-key is the same for all the merged
+        DeletionTime effectivePartitionDeletion;
+
+        // Pick "max" pDeletion
+        if (partitionMergeLimit > 1)
+        {
+            /** {@link 
UnfilteredRowIterators.UnfilteredRowMergeIterator#collectPartitionLevelDeletion(List,
 UnfilteredRowIterators.MergeListener)}*/
+            effectivePartitionDeletion = 
sstableCursors[0].pHeader.deletionTime();
+            for (int i = 1; i < partitionMergeLimit; i++)
+            {
+                DeletionTime otherDeletionTime = 
sstableCursors[i].pHeader.deletionTime();
+                if (!effectivePartitionDeletion.supersedes(otherDeletionTime))
+                    effectivePartitionDeletion = otherDeletionTime;
+            }
+        }
+        else
+        {
+            effectivePartitionDeletion = partitionDescriptor.deletionTime();
+        }
+        partitionDeletion = effectivePartitionDeletion;
+        // maybe purge?
+        if (!effectivePartitionDeletion.isLive())
+        {
+            boolean shouldPurge = 
purger.shouldPurge(effectivePartitionDeletion);
+            if (!shouldPurge)
+            {
+                maybeSwitchWriter(compactionAwareWriter);
+                partitionHeaderLength = 
ssTableCursorWriter.writePartitionStart(partitionDescriptor.keyBytes(), 
partitionDescriptor.keyLength(), effectivePartitionDeletion);
+            }
+            else {
+                effectivePartitionDeletion = DeletionTime.LIVE;
+            }
+        }
+
+        // Merge any common static rows
+        DeletionTime partitionDeletion = this.partitionDeletion;
+        if (hasStaticColumns)
+        {
+            sortForStaticRow(partitionMergeLimit);
+            // move cursors that need to move passed the row header
+            int staticRowMergeLimit = 
findStaticRowMergeLimit(partitionMergeLimit);
+
+            mergeRows(staticRowMergeLimit, partitionDeletion, true, false);
+            if (isPartitionStarted())
+                partitionHeaderLength = (int) 
(ssTableCursorWriter.getPosition() - ssTableCursorWriter.getPartitionStart());
+        }
+
+        // Merge any common normal rows
+        int elementMergeLimit = partitionMergeLimit;
+        DeletionTime activeDeletion = partitionDeletion;
+        boolean isFirstElement = true;
+        int elementCount = 0;
+        ElementDescriptor lastClustering = null;
+        while (true)
+        {
+            // move cursors that need to move passed the row header
+            prepareCursorsForNextElement(elementMergeLimit);
+
+            // Sort rows by their clustering
+            sortForElementMerge(elementMergeLimit, partitionMergeLimit);
+            int readerState = sstableCursors[0].state();
+            if (readerState == PARTITION_END)
+                break;
+
+            // At least one partition has not ended
+            // TODO: implement sort to keep this info around and avoid 
re-comparisons
+            elementMergeLimit = findElementMergeLimit(partitionMergeLimit);
+            int flags = sstableCursors[0].rHeader.flags();
+            if (UnfilteredSerializer.isRow(flags))
+            {
+                if (mergeRows(elementMergeLimit, activeDeletion, false, 
isFirstElement))
+                {
+                    isFirstElement = false;
+                    elementCount++;
+                    lastClustering = sstableCursors[0].rHeader;
+                }
+            }
+            else if (UnfilteredSerializer.isTombstoneMarker(flags)) {
+                // the tombstone processing *maybe* writes a marker, and 
*maybe* changes the `activeOpenRangeDeletion`
+                if (mergeRangeTombstones(elementMergeLimit, partitionDeletion, 
isFirstElement))
+                {
+                    isFirstElement = false;
+                    elementCount++;
+                    lastClustering = sstableCursors[0].rHeader;
+                }
+                if (activeOpenRangeDeletion == DeletionTime.LIVE) {
+                    activeDeletion = partitionDeletion;
+                }
+                else {
+                    activeDeletion = activeOpenRangeDeletion;
+                }
+            }
+            else {
+                throw new IllegalStateException("Unexpected element type (not 
row or tombstone):" + flags);
+            }
+            // move along
+            continueReadingAfterMerge(elementMergeLimit, ELEMENT_END);
+        }
+
+        boolean partitionWritten = isPartitionStarted();

Review Comment:
   After reading more of the code I realize this cannot really be done. Please 
ignore the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to