netudima commented on code in PR #4402:
URL: https://github.com/apache/cassandra/pull/4402#discussion_r2589135717


##########
src/java/org/apache/cassandra/io/sstable/ReusableLivenessInfo.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.rows.AbstractCell;
+
+// TODO: flatten into headers
+public class ReusableLivenessInfo implements LivenessInfo

Review Comment:
   is it intentionally placed to sstable package, not near/within LivenessInfo 
interface? (like we have for DeletionTime)



##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -256,27 +260,27 @@ public boolean apply(SSTableReader sstable)
                 long lastBytesScanned = 0;
 
                 activeCompactions.beginCompaction(ci);
-                try (CompactionAwareWriter writer = 
getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
+                try (AutoCloseable resource = 
getCompactionAwareWriter(actuallyCompact, ci))
                 {
                     // Note that we need to re-check this flag after calling 
beginCompaction above to avoid a window
                     // where the compaction does not exist in 
activeCompactions but the CSM gets paused.
                     // We already have the sstables marked compacting here so 
CompactionManager#waitForCessation will
                     // block until the below exception is thrown and the 
transaction is cancelled.
                     if 
(!controller.cfs.getCompactionStrategyManager().isActive())
                         throw new 
CompactionInterruptedException(ci.getCompactionInfo());
-                    estimatedKeys = writer.estimatedKeys();
-                    while (ci.hasNext())
+                    estimatedKeys = ci.estimatedKeys();
+                    while (ci.processNextPartitionKey())
                     {
-                        if (writer.append(ci.next()))
-                            totalKeysWritten++;
-
-                        
ci.setTargetDirectory(writer.getSStableDirectory().path());
-                        long bytesScanned = scanners.getTotalBytesScanned();
+                        long bytesScanned = ci.getTotalBytesScanned();
 
-                        // Rate limit the scanners, and account for compression
-                        
CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, 
lastBytesScanned, compressionRatio);
+                        // If we ingested less than a MB, keep going
+                        if (bytesScanned - lastBytesScanned > MEGABYTE)
+                        {
+                            // Rate limit the scanners, and account for 
compression
+                            
CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, 
lastBytesScanned, compressionRatio);

Review Comment:
   should we report the remaining part once the loop is completed to reflect 
accurately all scanned bytes in 
org.apache.cassandra.metrics.CompactionMetrics#bytesCompactedThroughput ?



##########
test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java:
##########
@@ -403,6 +403,7 @@ private String cut(String s, int n)
         return s.substring(0, s.length() - n);
     }
 
+    // TODO: Remove the @Ignore on this test and fix it

Review Comment:
   is the comment actual?



##########
src/java/org/apache/cassandra/io/sstable/UnfilteredDescriptor.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime.ReusableDeletionTime;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class UnfilteredDescriptor extends ClusteringDescriptor
+{
+    private final ReusableLivenessInfo rowLivenessInfo = new 
ReusableLivenessInfo();
+    private final ReusableDeletionTime deletionTime = 
ReusableDeletionTime.live();
+    private final ReusableDeletionTime deletionTime2 = 
ReusableDeletionTime.live();
+
+    private long position;
+    private int flags;
+    private int extendedFlags;
+
+    private long unfilteredSize;
+    private long unfilteredDataStart;
+//    private long prevUnfilteredSize;

Review Comment:
   commented code



##########
test/unit/org/apache/cassandra/tools/OfflineToolUtils.java:
##########
@@ -79,6 +79,8 @@ public abstract class OfflineToolUtils
     "Attach Listener", // spawned in intellij IDEA
     "JNA Cleaner",     // spawned by JNA
     "ThreadLocalMetrics-Cleaner", // spawned by 
org.apache.cassandra.metrics.ThreadLocalMetrics
+    "Native reference cleanup thread",
+    "^ForkJoinPool\\.commonPool-worker-\\d+$"

Review Comment:
   do we need this change? (I haven't noticed fork-join/parallel streams 
related logic in the changes..) 



##########
src/java/org/apache/cassandra/db/marshal/NativeAccessor.java:
##########
@@ -149,7 +149,7 @@ else if (accessorR == ByteBufferAccessor.instance)
             int leftSize = left.nativeDataSize();
             int rightSize = rightNative.nativeDataSize();
             return FastByteOperations.compareMemoryUnsigned(left.getAddress(), 
leftSize, rightNative.getAddress(), rightSize);
-        } else // just in case of new implementations of ValueAccessor appear
+        }else // just in case of new implementations of ValueAccessor appear

Review Comment:
   nit: unnecessary change 



##########
src/java/org/apache/cassandra/io/sstable/format/big/BigFormatPartitionWriter.java:
##########
@@ -62,8 +63,10 @@ public class BigFormatPartitionWriter extends 
SortedTablePartitionWriter
 
     private final ISerializer<IndexInfo> idxSerializer;
 
-    private final int cacheSizeThreshold;
-    private final int indexSize;
+    /** Beyond this limit we switch from storing IndexInfo in the list to 
directly serializing them into a buffer */
+    private final int switchIndexInfoToBufferThreshold;
+    /** If a pratition grows beyond this size we store inter-partition index 
data in IndexInfo */

Review Comment:
   nit, typo: pratition



##########
src/java/org/apache/cassandra/db/compaction/AbstractCompactionPipeline.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.TimeUUID;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+abstract class AbstractCompactionPipeline extends CompactionInfo.Holder 
implements AutoCloseable {
+    static AbstractCompactionPipeline create(CompactionTask task, 
OperationType type, AbstractCompactionStrategy.ScannerList scanners, 
AbstractCompactionController controller, long nowInSec, TimeUUID compactionId)

Review Comment:
   nit: too long line, better to format as several lines



##########
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java:
##########
@@ -338,7 +338,8 @@ public void testMutateLevel() throws Exception
         assertEquals(cfs.getLiveSSTables().size(), levels[6]);
     }
 
-    @Test
+    // TODO: Uncomment and fix

Review Comment:
   an existing test is disabled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to