belliottsmith commented on code in PR #3981: URL: https://github.com/apache/cassandra/pull/3981#discussion_r1995607121
########## src/java/org/apache/cassandra/db/compaction/CompactionIterator.java: ########## @@ -962,47 +913,223 @@ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition } } + protected void collect(Row row) throws IOException + { + updateProgress(); + ByteBuffer bytes = row.getCell(recordColumn).buffer(); + Version userVersion = Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer())); + compactor.collect(key, row, bytes, userVersion); + } + } + + static abstract class AccordRowCompactor<T extends FlyweightImage> + { + final FlyweightSerializer<Object, T> serializer; + + AccordRowCompactor(FlyweightSerializer<Object, T> serializer) + { + this.serializer = serializer; + } + + abstract void reset(JournalKey key); + abstract void collect(JournalKey key, Row row, ByteBuffer bytes, Version userVersion) throws IOException; + abstract UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException; + } + + static class AccordMergingCompactor<T extends FlyweightImage> extends AccordRowCompactor<T> + { + final T builder; + final Version userVersion; + Object[] highestClustering; + long lastDescriptor = -1; + int lastOffset = -1; + + AccordMergingCompactor(FlyweightSerializer<Object, T> serializer, Version userVersion) + { + super(serializer); + this.builder = serializer.mergerFor(); + this.userVersion = userVersion; + } + @Override - protected Row applyToRow(Row row) + void reset(JournalKey key) { - return row; + builder.reset(key); } - protected void collect(Row row) throws IOException + @Override + protected void collect(JournalKey key, Row row, ByteBuffer bytes, Version userVersion) throws IOException { - updateProgress(); - ByteBuffer record = row.getCell(recordColumn).buffer(); + if (highestClustering == null) + highestClustering = row.clustering().getBufferArray(); + long descriptor = LongType.instance.compose(row.clustering().bufferAt(0)); int offset = Int32Type.instance.compose(row.clustering().bufferAt(1)); if (lastOffset != -1) { Invariants.require(descriptor <= lastDescriptor, - "Descriptors were accessed out of order: %d was accessed after %d", descriptor, lastDescriptor); + "Descriptors were accessed out of order: %d was accessed after %d", descriptor, lastDescriptor); Invariants.require(descriptor != lastDescriptor || - offset < lastOffset, - "Offsets within %s were accessed out of order: %d was accessed after %s", offset, lastOffset); + offset < lastOffset, + "Offsets within %s were accessed out of order: %d was accessed after %s", offset, lastOffset); } lastDescriptor = descriptor; lastOffset = offset; - try (DataInputBuffer in = new DataInputBuffer(record, false)) + try (DataInputBuffer in = new DataInputBuffer(bytes, false)) { - Version userVersion = Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer())); - if (key.type == JournalKey.Type.TOPOLOGY_UPDATE) - topologySerializer.deserialize(key, builder, in, userVersion); - else - serializer.deserialize(key, builder, in, userVersion); - if (highestClustering == null) // we iterate highest to lowest - highestClustering = row.clustering().getBufferArray(); + serializer.deserialize(key, builder, in, userVersion); } } @Override - protected Row applyToStatic(Row row) + UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException { - checkState(row.isStatic() && row.isEmpty()); - return row; + PartitionUpdate.SimpleBuilder newVersion = PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partitionKey); + try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get()) + { + serializer.reserialize(journalKey, builder, out, userVersion); + newVersion.row(highestClustering) + .add("record", out.asNewBuffer()) + .add("user_version", userVersion.version); + } + + return newVersion.build().unfilteredIterator(); + } + } + + static class AccordCommandRowEntry + { + final AccordJournal.Builder builder = new AccordJournal.Builder(); + Row row; + boolean modified; + + void init(JournalKey key, Row row, ByteBuffer bytes, Version userVersion) throws IOException + { + this.row = row; + this.builder.reset(key); + try (DataInputBuffer in = new DataInputBuffer(bytes, false)) + { + builder.deserializeNext(in, userVersion); + } + } + + void clear() + { + row = null; + modified = false; + builder.clear(); + } + } + + static class AccordCommandRowCompactor extends AccordRowCompactor<AccordJournal.Builder> + { + static final Object[] rowTemplate = BTree.build(BulkIterator.of(new Object[2]), 2, UpdateFunction.noOp); + final long timestamp = ClientState.getTimestamp(); + final AccordCompactionInfos infos; + final AccordAgent agent; + final Version userVersion; + final ColumnData userVersionCell; + final long nowInSec; + + final AccordJournal.Builder mainBuilder = new AccordJournal.Builder(); + final List<AccordCommandRowEntry> entries = new ArrayList<>(); + final ArrayDeque<AccordCommandRowEntry> reuseEntries = new ArrayDeque<>(); + AccordCompactionInfo info; + + AccordCommandRowCompactor(AccordCompactionInfos infos, AccordAgent agent, Version userVersion, long nowInSec) + { + super((FlyweightSerializer<Object, AccordJournal.Builder>) JournalKey.Type.COMMAND_DIFF.serializer); + this.infos = infos; + this.agent = agent; + this.userVersion = userVersion; + this.userVersionCell = BufferCell.live(AccordKeyspace.JournalColumns.user_version, timestamp, Int32Type.instance.decompose(userVersion.version)); + this.nowInSec = nowInSec; + } + + @Override + void reset(JournalKey key) + { + mainBuilder.reset(key); + reuseEntries.addAll(entries); + for (int i = 0; i < entries.size() ; ++i) + entries.get(i).clear(); + entries.clear(); + } + + @Override + void collect(JournalKey key, Row row, ByteBuffer bytes, Version userVersion) throws IOException + { + AccordCommandRowEntry e = reuseEntries.pollLast(); + if (e == null) + e = new AccordCommandRowEntry(); + entries.add(e); + e.init(key, row, bytes, userVersion); + e.modified |= e.builder.clearSuperseded(false, mainBuilder); + mainBuilder.fillInMissingOrCleanup(false, e.builder); + } + + @Override + UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) throws IOException + { + if (mainBuilder.isEmpty()) + return null; + + if (info != null && info.commandStoreId != journalKey.commandStoreId) info = null; + if (info == null) info = infos.get(journalKey.commandStoreId); + // TODO (required): should return null only if commandStore has been removed + if (info == null) + return null; + + DurableBefore durableBefore = infos.durableBefore; + Cleanup cleanup = mainBuilder.maybeCleanup(false, PARTIAL, info.redundantBefore, durableBefore); + if (cleanup != NO) + { + switch (cleanup) + { + default: throw new UnhandledEnum(cleanup); + case EXPUNGE: + return null; + case ERASE: + return PartitionUpdate.fullPartitionDelete(AccordKeyspace.Journal, partitionKey, Long.MAX_VALUE, nowInSec).unfilteredIterator(); + + case TRUNCATE: + case TRUNCATE_WITH_OUTCOME: + case INVALIDATE: + case VESTIGIAL: + if (!entries.isEmpty()) + { + { + AccordCommandRowEntry entry = entries.get(0); + entry.modified |= entry.builder.addCleanup(false, cleanup); + } + for (int i = 1, size = entries.size() ; i < size ; i++) + { + AccordCommandRowEntry entry = entries.get(i); + entry.modified |= entry.builder.cleanup(false, cleanup); Review Comment: I think this was a bad justification. The compiler should hoist it, and I think it probably is cleaner with an if inside. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org