iamaleksey commented on code in PR #3550:
URL: https://github.com/apache/cassandra/pull/3550#discussion_r1775155871
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -271,6 +272,11 @@ public CompactionInfo getCompactionInfo()
return new CompactionInfo(new Int2ObjectHashMap<>(), new
Int2ObjectHashMap<>(), DurableBefore.EMPTY);
}
+ public Params journalConfiguration()
Review Comment:
Nit: missing `@Override`
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -1073,4 +1079,9 @@ public CompactionInfo getCompactionInfo()
}));
return new CompactionInfo(redundantBefores, ranges,
durableBefore.get());
}
+
+ public Params journalConfiguration()
Review Comment:
Nit: missing `@Override`
##########
src/java/org/apache/cassandra/db/compaction/CompactionIterator.java:
##########
@@ -703,298 +661,6 @@ protected UnfilteredRowIterator
applyToPartition(UnfilteredRowIterator partition
}
}
- private abstract class AbstractPurger extends
Transformation<UnfilteredRowIterator>
- {
- int compactedUnfiltered;
-
- protected void onEmptyPartitionPostPurge(DecoratedKey key)
- {
- if (type == OperationType.COMPACTION)
- controller.cfs.invalidateCachedPartition(key);
- }
-
- protected void updateProgress()
- {
- if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0)
- updateBytesRead();
- }
-
- @Override
- protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator
partition)
- {
- beginPartition(partition);
- UnfilteredRowIterator purged = Transformation.apply(partition,
this);
- if (purged.isEmpty())
- {
- onEmptyPartitionPostPurge(purged.partitionKey());
- purged.close();
- return null;
- }
-
- return purged;
- }
-
- protected abstract void beginPartition(UnfilteredRowIterator
partition);
- }
-
- private class PaxosPurger extends AbstractPurger
- {
- private final long paxosPurgeGraceMicros =
DatabaseDescriptor.getPaxosPurgeGrace(MICROSECONDS);
- private final Map<TableId, PaxosRepairHistory.Searcher>
tableIdToHistory = new HashMap<>();
-
- private Token token;
-
- @Override
- protected void beginPartition(UnfilteredRowIterator partition)
- {
- this.token = partition.partitionKey().getToken();
- }
-
- @Override
- protected Row applyToRow(Row row)
- {
- updateProgress();
-
- TableId tableId = PaxosRows.getTableId(row);
-
- switch (paxosStatePurging())
- {
- default: throw new AssertionError();
- case legacy:
- case gc_grace:
- {
- TableMetadata metadata =
Schema.instance.getTableMetadata(tableId);
- return
row.purgeDataOlderThan(TimeUnit.SECONDS.toMicros(nowInSec - (metadata == null ?
(3 * 3600) : metadata.params.gcGraceSeconds)), false);
- }
- case repaired:
- {
- PaxosRepairHistory.Searcher history =
tableIdToHistory.computeIfAbsent(tableId, find -> {
- TableMetadata metadata =
Schema.instance.getTableMetadata(find);
- if (metadata == null)
- return null;
- return
Keyspace.openAndGetStore(metadata).getPaxosRepairHistory().searcher();
- });
-
- return history == null ? row :
-
row.purgeDataOlderThan(history.ballotForToken(token).unixMicros() -
paxosPurgeGraceMicros, false);
- }
- }
- }
- }
-
- class AccordCommandsPurger extends AbstractPurger
- {
- final Int2ObjectHashMap<RedundantBefore> redundantBefores;
- final Int2ObjectHashMap<RangesForEpoch> ranges;
- final DurableBefore durableBefore;
-
- int storeId;
- TxnId txnId;
-
- AccordCommandsPurger(Supplier<IAccordService> accordService)
- {
- IAccordService.CompactionInfo compactionInfo =
accordService.get().getCompactionInfo();
- this.redundantBefores = compactionInfo.redundantBefores;
- this.ranges = compactionInfo.ranges;
- this.durableBefore = compactionInfo.durableBefore;
- }
-
- protected void beginPartition(UnfilteredRowIterator partition)
- {
- ByteBuffer[] partitionKeyComponents =
CommandRows.splitPartitionKey(partition.partitionKey());
- storeId = CommandRows.getStoreId(partitionKeyComponents);
- txnId = CommandRows.getTxnId(partitionKeyComponents);
- }
-
- @Override
- protected Row applyToRow(Row row)
- {
- updateProgress();
-
- RedundantBefore redundantBefore = redundantBefores.get(storeId);
- // TODO (expected): if the store has been retired, this should
return null
- if (redundantBefore == null)
- return row;
-
- // When commands end up being sliced by compaction we need this to
discard tombstones and slices
- // without enough information to run the rest of the cleanup logic
- if (Cleanup.isSafeToCleanup(durableBefore, txnId,
ranges.get(storeId).allAt(txnId.epoch())))
- return null;
-
- Cell durabilityCell = row.getCell(CommandsColumns.durability);
- Durability durability =
deserializeDurabilityOrNull(durabilityCell);
- Cell executeAtCell = row.getCell(CommandsColumns.execute_at);
- Timestamp executeAt = deserializeTimestampOrNull(executeAtCell);
- Cell routeCell = row.getCell(CommandsColumns.route);
- Route<?> route = deserializeRouteOrNull(routeCell);
- Cell statusCell = row.getCell(CommandsColumns.status);
- SaveStatus saveStatus = deserializeSaveStatusOrNull(statusCell);
-
- // With a sliced row we might not have enough columns to determine
what to do so output the
- // the row unmodified and we will try again later once it merges
with the rest of the command state
- // or is dropped by `durableBefore.min(txnId) == Universal`
- if (executeAt == null || durability == null || saveStatus == null
|| route == null)
- return row;
-
- Cleanup cleanup = shouldCleanup(txnId, saveStatus.status,
- durability, executeAt, route,
- redundantBefore, durableBefore,
- false);
- switch (cleanup)
- {
- default: throw new AssertionError(String.format("Unexpected
cleanup task: %s", cleanup));
- case ERASE:
- // Emit a tombstone so if this is slicing the command and
making it not possible to determine if it
- // can be truncated later it can still be dropped via the
tombstone.
- // Eventually the tombstone can be dropped by
`durableBefore.min(txnId) == Universal`
- // We can still encounter sliced command state just
because compaction inputs are random
- return BTreeRow.emptyDeletedRow(row.clustering(), new
Row.Deletion(DeletionTime.build(row.primaryKeyLivenessInfo().timestamp(),
nowInSec), false));
-
Review Comment:
I think these two lines got lost in transition. Bad merge?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]