smiklosovic commented on code in PR #3412:
URL: https://github.com/apache/cassandra/pull/3412#discussion_r1765781163


##########
src/java/org/apache/cassandra/db/virtual/model/StorageOperationsRow.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual.model;
+
+import java.util.Map;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static java.lang.String.format;
+
+/**
+ * Representation of operation for a {@link 
org.apache.cassandra.db.virtual.CollectionVirtualTableAdapter}.
+ */
+public class StorageOperationsRow
+{
+    private static final String UNDEFINED = "undefined";
+    private final Map.Entry<TimeUUID, CompactionManager.OperationInfo> entry;
+
+    public StorageOperationsRow(Map.Entry<TimeUUID, 
CompactionManager.OperationInfo> entry)
+    {
+        this.entry = entry;
+    }
+
+    @Column(type = Column.Type.PARTITION_KEY)
+    public String operationId()
+    {
+        return entry.getKey().toString();
+    }
+
+    @Column
+    public String operationType()
+    {
+        return entry.getValue().operationType.toString();
+    }
+
+    @Column
+    public String keyspaces()
+    {
+        return String.join(", ", entry.getValue().keyspaceNames);
+    }
+
+    @Column
+    public String tables()
+    {
+        return entry.getValue().processedCfs().keySet().toString();
+    }
+
+    @Column
+    public String operationResult()
+    {
+        CompactionManager.AllSSTableOpStatus status = 
entry.getValue().result();
+        return status == null ? UNDEFINED : status.toString();
+    }
+
+    @Column
+    public String operationResultByTable()
+    {
+        return formatMap(entry.getValue().processedCfs());
+    }
+
+    @Column
+    public String processedByKeyspace()
+    {
+        return formatMap(entry.getValue().processedSStables());
+    }
+
+    private static String formatMap(Map<?, ?> map)
+    {
+        return format("[%s]", map.entrySet().stream()

Review Comment:
   would you mind to rewrite this to for loops?
   
   https://lists.apache.org/thread/65glsjzkmpktzmns6j9wvr4nczvskx36



##########
src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java:
##########
@@ -45,6 +50,12 @@ private SystemViewsKeyspace()
                                                               new 
ThreadPoolRowWalker(),
                                                               
Metrics.allThreadPoolMetrics(),
                                                               
ThreadPoolRow::new))
+                    .add(CollectionVirtualTableAdapter.create(VIRTUAL_VIEWS,
+                                                              
STORAGE_OPERATIONS,
+                                                              "Storage 
operations currently running or completed for the last 7 days",

Review Comment:
   you hardcoded 7 days here not reflecting what a user might set it to



##########
src/java/org/apache/cassandra/service/StorageService.java:
##########
@@ -2578,16 +2578,14 @@ public int forceKeyspaceCleanup(int jobs, String 
keyspaceName, String... tableNa
         if (isLocalSystemKeyspace(keyspaceName))
             throw new RuntimeException("Cleanup of the system keyspace is 
neither necessary nor wise");
 
-        CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
         logger.info("Starting {} on {}.{}", OperationType.CLEANUP, 
keyspaceName, Arrays.toString(tableNames));
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
keyspaceName, tableNames))
-        {
-            CompactionManager.AllSSTableOpStatus oneStatus = 
cfStore.forceCleanup(jobs);
-            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
-                status = oneStatus;
-        }
-        logger.info("Completed {} with status {}", OperationType.CLEANUP, 
status);
-        return status.statusCode;
+        CompactionManager.AllSSTableOpStatus result = 
CompactionManager.instance

Review Comment:
   would you mind to fix the alignement?



##########
test/unit/org/apache/cassandra/db/virtual/StorageOperationsTableCleanupTest.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import org.junit.After;
+
+import org.apache.cassandra.db.CleanupTest;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.tools.ToolRunner;
+
+import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
+
+public class StorageOperationsTableCleanupTest extends CleanupTest

Review Comment:
   This is an interesting way of doing it. Honestly I have never encountered 
something like this. I understand what you are trying to do here but I think 
this is not a good practice. Each test should be just standalone or extend some 
hierarchy of abstract class(es). Overriding the test methods is really strange 
to my taste. I do not know how yet how to approach this differently, looking 
into `CleanupTest`, there is a lot of private methods which might be 
`protected` and each test method would be repeated here and you would add the 
bits you just added after you called test methods from `CleanupTest` into them.



##########
src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java:
##########
@@ -28,6 +31,8 @@
 
 public final class SystemViewsKeyspace extends VirtualKeyspace
 {
+    public static final String STORAGE_OPERATIONS = 
"storage_operations_history";

Review Comment:
   what other operations do you plan to add? now it seems like it is just about 
"compaction". Should not we reflect that? There is also "thread_pools" not on 
constant if we play the constants game here.



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -1219,45 +1267,57 @@ public void forceUserDefinedCleanup(String dataFiles)
             ColumnFamilyStore cfs = 
Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
             desc = cfs.getDirectories().find(new File(filename.trim()).name());
             if (desc != null)
-                descriptors.put(cfs, desc);
+                descriptors.computeIfAbsent(cfs, c -> new 
ArrayList<>()).add(desc);
         }
 
+        descriptors.keySet().forEach(cfs -> 
info.markKeyspace(cfs.getKeyspaceName()));
         if (!StorageService.instance.isJoined())
         {
             logger.error("Cleanup cannot run before a node has joined the 
ring");
+            info.result = AllSSTableOpStatus.ABORTED;
             return;
         }
 
-        for (Map.Entry<ColumnFamilyStore,Descriptor> entry : 
descriptors.entrySet())
+        for (Map.Entry<ColumnFamilyStore, List<Descriptor>> entry : 
descriptors.entrySet())
         {
             ColumnFamilyStore cfs = entry.getKey();
             Keyspace keyspace = cfs.keyspace;
             final RangesAtEndpoint replicas = 
StorageService.instance.getLocalReplicas(keyspace.getName());
             final Set<Range<Token>> allRanges = replicas.ranges();
             final Set<Range<Token>> transientRanges = 
replicas.onlyTransient().ranges();
             boolean hasIndexes = cfs.indexManager.hasIndexes();
-            SSTableReader sstable = lookupSSTable(cfs, entry.getValue());
+            List<SSTableReader> sstables = entry.getValue().stream().

Review Comment:
   would you mind to rewrite this to for loops?
   
   https://lists.apache.org/thread/65glsjzkmpktzmns6j9wvr4nczvskx36



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -2537,4 +2608,104 @@ public interface CompactionPauser extends AutoCloseable
     {
         public void close();
     }
+
+    public static class OperationInfo
+    {
+        /** The unique id for this operation */
+        public final TimeUUID operationId = TimeUUID.Generator.nextTimeUUID();
+        /** The type of operation being performed */
+        public final OperationType operationType;
+        /** The keyspace names for which this operation is being performed */
+        public final Set<String> keyspaceNames = new ConcurrentSkipListSet<>();
+        /** The keyspace and table names for which this operation is being 
performed */
+        public final Map<CFInfo, AllSSTableOpStatus> processedCfs = new 
ConcurrentHashMap<>();
+        /** The number of sstables that have been completed for each keyspace 
*/
+        public final Map<String, AtomicInteger> processedSStables = new 
ConcurrentHashMap<>();
+        /** The final result of the operation */
+        private volatile AllSSTableOpStatus result;
+
+        private OperationInfo(OperationType operationType)
+        {
+            this.operationType = operationType;
+        }
+
+        private void markKeyspace(String keyspace)
+        {
+            keyspaceNames.add(keyspace);
+            processedSStables.computeIfAbsent(keyspace, k -> new 
AtomicInteger());
+        }
+
+        private void tableProcessedStatus(String keyspace, String table, 
AllSSTableOpStatus status)
+        {
+            processedCfs.putIfAbsent(new CFInfo(keyspace, table), status);
+        }
+
+        private void markTableProcessed(String keyspace)
+        {
+            processedSStables.computeIfAbsent(keyspace, k -> new 
AtomicInteger()).incrementAndGet();
+        }
+
+        public AllSSTableOpStatus result()
+        {
+            return result;
+        }
+
+        public Map<CFInfo, AllSSTableOpStatus> processedCfs()
+        {
+            return processedCfs;
+        }
+
+        public Map<String, AtomicInteger> processedSStables()

Review Comment:
   `processedSSTablesPerKeyspace()`?



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -2537,4 +2608,104 @@ public interface CompactionPauser extends AutoCloseable
     {
         public void close();
     }
+
+    public static class OperationInfo
+    {
+        /** The unique id for this operation */
+        public final TimeUUID operationId = TimeUUID.Generator.nextTimeUUID();
+        /** The type of operation being performed */
+        public final OperationType operationType;
+        /** The keyspace names for which this operation is being performed */
+        public final Set<String> keyspaceNames = new ConcurrentSkipListSet<>();
+        /** The keyspace and table names for which this operation is being 
performed */
+        public final Map<CFInfo, AllSSTableOpStatus> processedCfs = new 
ConcurrentHashMap<>();
+        /** The number of sstables that have been completed for each keyspace 
*/
+        public final Map<String, AtomicInteger> processedSStables = new 
ConcurrentHashMap<>();

Review Comment:
   `numberOfSSTablesPerKeyspace`, `sstableCountPerKeyspace`, 
`processedSSTablesPerKeyspace` ... you got the idea



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -174,6 +183,12 @@ public class CompactionManager implements 
CompactionManagerMBean, ICompactionMan
     final Multiset<ColumnFamilyStore> compactingCF = 
ConcurrentHashMultiset.create();
 
     public final ActiveCompactions active = new ActiveCompactions();
+    private final Cache<TimeUUID, OperationInfo> operations = 
CacheBuilder.newBuilder()

Review Comment:
   I wanted to say that his should be "space-bound" instead of "time-bound" but 
I noticed that you actually did both which is great. If somebody goes crazy and 
they cleanup like mad then without the limit on size this would grow and grow 
without any control.
   
   I would just like to know based on what you did it like 7 days and 300 
entries. Is this empirical based on your workload? I think revisiting the 
decision about the figures here would be nice.
   
   Also, does the cache preserve any insertion order? If you do 
operations.toMap(), do I get entries without any particular order in the 
virtual table? I would like them to be order the same way every single time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to