iamaleksey commented on code in PR #3550:
URL: https://github.com/apache/cassandra/pull/3550#discussion_r1775155871


##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -271,6 +272,11 @@ public CompactionInfo getCompactionInfo()
             return new CompactionInfo(new Int2ObjectHashMap<>(), new 
Int2ObjectHashMap<>(), DurableBefore.EMPTY);
         }
 
+        public Params journalConfiguration()

Review Comment:
   Nit: missing `@Override`



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -1073,4 +1079,9 @@ public CompactionInfo getCompactionInfo()
         }));
         return new CompactionInfo(redundantBefores, ranges, 
durableBefore.get());
     }
+
+    public Params journalConfiguration()

Review Comment:
   Nit: missing `@Override`



##########
src/java/org/apache/cassandra/db/compaction/CompactionIterator.java:
##########
@@ -703,298 +661,6 @@ protected UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition
         }
     }
 
-    private abstract class AbstractPurger extends 
Transformation<UnfilteredRowIterator>
-    {
-        int compactedUnfiltered;
-
-        protected void onEmptyPartitionPostPurge(DecoratedKey key)
-        {
-            if (type == OperationType.COMPACTION)
-                controller.cfs.invalidateCachedPartition(key);
-        }
-
-        protected void updateProgress()
-        {
-            if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0)
-                updateBytesRead();
-        }
-
-        @Override
-        protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
-        {
-            beginPartition(partition);
-            UnfilteredRowIterator purged = Transformation.apply(partition, 
this);
-            if (purged.isEmpty())
-            {
-                onEmptyPartitionPostPurge(purged.partitionKey());
-                purged.close();
-                return null;
-            }
-
-            return purged;
-        }
-
-        protected abstract void beginPartition(UnfilteredRowIterator 
partition);
-    }
-
-    private class PaxosPurger extends AbstractPurger
-    {
-        private final long paxosPurgeGraceMicros = 
DatabaseDescriptor.getPaxosPurgeGrace(MICROSECONDS);
-        private final Map<TableId, PaxosRepairHistory.Searcher> 
tableIdToHistory = new HashMap<>();
-
-        private Token token;
-
-        @Override
-        protected void beginPartition(UnfilteredRowIterator partition)
-        {
-            this.token = partition.partitionKey().getToken();
-        }
-
-        @Override
-        protected Row applyToRow(Row row)
-        {
-            updateProgress();
-
-            TableId tableId = PaxosRows.getTableId(row);
-
-            switch (paxosStatePurging())
-            {
-                default: throw new AssertionError();
-                case legacy:
-                case gc_grace:
-                {
-                    TableMetadata metadata = 
Schema.instance.getTableMetadata(tableId);
-                    return 
row.purgeDataOlderThan(TimeUnit.SECONDS.toMicros(nowInSec - (metadata == null ? 
(3 * 3600) : metadata.params.gcGraceSeconds)), false);
-                }
-                case repaired:
-                {
-                    PaxosRepairHistory.Searcher history = 
tableIdToHistory.computeIfAbsent(tableId, find -> {
-                        TableMetadata metadata = 
Schema.instance.getTableMetadata(find);
-                        if (metadata == null)
-                            return null;
-                        return 
Keyspace.openAndGetStore(metadata).getPaxosRepairHistory().searcher();
-                    });
-
-                    return history == null ? row :
-                           
row.purgeDataOlderThan(history.ballotForToken(token).unixMicros() - 
paxosPurgeGraceMicros, false);
-                }
-            }
-        }
-    }
-
-    class AccordCommandsPurger extends AbstractPurger
-    {
-        final Int2ObjectHashMap<RedundantBefore> redundantBefores;
-        final Int2ObjectHashMap<RangesForEpoch> ranges;
-        final DurableBefore durableBefore;
-
-        int storeId;
-        TxnId txnId;
-
-        AccordCommandsPurger(Supplier<IAccordService> accordService)
-        {
-            IAccordService.CompactionInfo compactionInfo = 
accordService.get().getCompactionInfo();
-            this.redundantBefores = compactionInfo.redundantBefores;
-            this.ranges = compactionInfo.ranges;
-            this.durableBefore = compactionInfo.durableBefore;
-        }
-
-        protected void beginPartition(UnfilteredRowIterator partition)
-        {
-            ByteBuffer[] partitionKeyComponents = 
CommandRows.splitPartitionKey(partition.partitionKey());
-            storeId = CommandRows.getStoreId(partitionKeyComponents);
-            txnId = CommandRows.getTxnId(partitionKeyComponents);
-        }
-
-        @Override
-        protected Row applyToRow(Row row)
-        {
-            updateProgress();
-
-            RedundantBefore redundantBefore = redundantBefores.get(storeId);
-            // TODO (expected): if the store has been retired, this should 
return null
-            if (redundantBefore == null)
-                return row;
-
-            // When commands end up being sliced by compaction we need this to 
discard tombstones and slices
-            // without enough information to run the rest of the cleanup logic
-            if (Cleanup.isSafeToCleanup(durableBefore, txnId, 
ranges.get(storeId).allAt(txnId.epoch())))
-                return null;
-
-            Cell durabilityCell = row.getCell(CommandsColumns.durability);
-            Durability durability = 
deserializeDurabilityOrNull(durabilityCell);
-            Cell executeAtCell = row.getCell(CommandsColumns.execute_at);
-            Timestamp executeAt = deserializeTimestampOrNull(executeAtCell);
-            Cell routeCell = row.getCell(CommandsColumns.route);
-            Route<?> route = deserializeRouteOrNull(routeCell);
-            Cell statusCell = row.getCell(CommandsColumns.status);
-            SaveStatus saveStatus = deserializeSaveStatusOrNull(statusCell);
-
-            // With a sliced row we might not have enough columns to determine 
what to do so output the
-            // the row unmodified and we will try again later once it merges 
with the rest of the command state
-            // or is dropped by `durableBefore.min(txnId) == Universal`
-            if (executeAt == null || durability == null || saveStatus == null 
|| route == null)
-                return row;
-
-            Cleanup cleanup = shouldCleanup(txnId, saveStatus.status,
-                                            durability, executeAt, route,
-                                            redundantBefore, durableBefore,
-                                            false);
-            switch (cleanup)
-            {
-                default: throw new AssertionError(String.format("Unexpected 
cleanup task: %s", cleanup));
-                case ERASE:
-                    // Emit a tombstone so if this is slicing the command and 
making it not possible to determine if it
-                    // can be truncated later it can still be dropped via the 
tombstone.
-                    // Eventually the tombstone can be dropped by 
`durableBefore.min(txnId) == Universal`
-                    // We can still encounter sliced command state just 
because compaction inputs are random
-                    return BTreeRow.emptyDeletedRow(row.clustering(), new 
Row.Deletion(DeletionTime.build(row.primaryKeyLivenessInfo().timestamp(), 
nowInSec), false));
-

Review Comment:
   I think these two lines got lost in transition. Bad merge?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to