blambov commented on code in PR #4402:
URL: https://github.com/apache/cassandra/pull/4402#discussion_r2509552334


##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -301,7 +301,7 @@ public boolean apply(SSTableReader sstable)
                     else if (e instanceof CompactionInterruptedException)
                         throw (CompactionInterruptedException) e;
                     else
-                        throw new IllegalStateException(e);
+                        throw new RuntimeException(e);

Review Comment:
   We don't need to catch these exceptions here. `WrappedRunnable.run()` will 
do it.



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionPipeline.java:
##########
@@ -31,14 +31,12 @@
 import java.util.Set;
 
 abstract class AbstractCompactionPipeline extends CompactionInfo.Holder 
implements AutoCloseable {
-    static AbstractCompactionPipeline create(CompactionTask task, 
OperationType type, AbstractCompactionStrategy.ScannerList scanners, 
AbstractCompactionController controller, long nowInSec, TimeUUID compactionId) {
+    static AbstractCompactionPipeline create(CompactionTask task, 
OperationType type, AbstractCompactionStrategy.ScannerList scanners, 
AbstractCompactionController controller, long nowInSec, TimeUUID compactionId)
+    {
         if (DatabaseDescriptor.enableCursorCompaction()) {
-            try {
+            if (CompactionCursor.isSupported(scanners, controller))

Review Comment:
   LGTM



##########
src/java/org/apache/cassandra/db/compaction/CompactionCursor.java:
##########
@@ -0,0 +1,1573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.LongPredicate;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.UnmodifiableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.ElementDescriptor;
+import org.apache.cassandra.io.sstable.PartitionDescriptor;
+import org.apache.cassandra.io.sstable.ReusableLivenessInfo;
+import org.apache.cassandra.io.sstable.SSTableCursorReader;
+import org.apache.cassandra.io.sstable.SSTableCursorWriter;
+import org.apache.cassandra.io.util.ReusableDecoratedKey;
+import org.apache.cassandra.io.util.ReusableLongToken;
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Cells;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.COMPARE;
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.LEFT;
+import static 
org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.RIGHT;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_HEADER_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_VALUE_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.DONE;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.ELEMENT_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_END;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.ROW_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.STATIC_ROW_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.TOMBSTONE_START;
+import static 
org.apache.cassandra.io.sstable.SSTableCursorReader.State.isState;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
+import static 
org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_START_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_BOUND;
+import static 
org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_START_BOUND;
+
+/**
+ * Merge multiple iterators over the content of sstable into a "compacted" 
iterator.
+ * <p>
+ * On top of the actual merging the source iterators, this class:
+ * <ul>
+ *   <li>purge gc-able tombstones if possible (see PurgeFunction below).</li>
+ *   <li>invalidate cached partitions that are empty post-compaction. This 
avoids keeping partitions with
+ *       only purgable tombstones in the row cache.</li>
+ *   <li>keep tracks of the compaction progress.</li>
+ *   <li>TODO:update 2ndary indexes if necessary (as we don't 
read-before-write on index updates, index entries are
+ *       not deleted on deletion of the base table data, which is ok because 
we'll fix index inconsistency
+ *       on reads. This however mean that potentially obsolete index entries 
could be kept a long time for
+ *       data that is not read often, so compaction "pro-actively" fix such 
index entries. This is mainly
+ *       an optimization).</li>
+ * </ul>
+ */

Review Comment:
   I would add something in the sense of "unlike the iterator compaction 
process, this does not create a view of the compacted sources to consumed, but 
instead pushes it to a writer immediately" just to highlight the difference.
   
   Both `CursorCompactor` and `CursorCompactorPipeline` sound good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to