belliottsmith commented on code in PR #3981:
URL: https://github.com/apache/cassandra/pull/3981#discussion_r1995607121


##########
src/java/org/apache/cassandra/db/compaction/CompactionIterator.java:
##########
@@ -962,47 +913,223 @@ protected UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition
             }
         }
 
+        protected void collect(Row row) throws IOException
+        {
+            updateProgress();
+            ByteBuffer bytes = row.getCell(recordColumn).buffer();
+            Version userVersion = 
Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer()));
+            compactor.collect(key, row, bytes, userVersion);
+        }
+    }
+
+    static abstract class AccordRowCompactor<T extends FlyweightImage>
+    {
+        final FlyweightSerializer<Object, T> serializer;
+
+        AccordRowCompactor(FlyweightSerializer<Object, T> serializer)
+        {
+            this.serializer = serializer;
+        }
+
+        abstract void reset(JournalKey key);
+        abstract void collect(JournalKey key, Row row, ByteBuffer bytes, 
Version userVersion) throws IOException;
+        abstract UnfilteredRowIterator result(JournalKey journalKey, 
DecoratedKey partitionKey) throws IOException;
+    }
+
+    static class AccordMergingCompactor<T extends FlyweightImage> extends 
AccordRowCompactor<T>
+    {
+        final T builder;
+        final Version userVersion;
+        Object[] highestClustering;
+        long lastDescriptor = -1;
+        int lastOffset = -1;
+
+        AccordMergingCompactor(FlyweightSerializer<Object, T> serializer, 
Version userVersion)
+        {
+            super(serializer);
+            this.builder = serializer.mergerFor();
+            this.userVersion = userVersion;
+        }
+
         @Override
-        protected Row applyToRow(Row row)
+        void reset(JournalKey key)
         {
-            return row;
+            builder.reset(key);
         }
 
-        protected void collect(Row row) throws IOException
+        @Override
+        protected void collect(JournalKey key, Row row, ByteBuffer bytes, 
Version userVersion) throws IOException
         {
-            updateProgress();
-            ByteBuffer record = row.getCell(recordColumn).buffer();
+            if (highestClustering == null)
+                highestClustering = row.clustering().getBufferArray();
+
             long descriptor = 
LongType.instance.compose(row.clustering().bufferAt(0));
             int offset = 
Int32Type.instance.compose(row.clustering().bufferAt(1));
 
             if (lastOffset != -1)
             {
                 Invariants.require(descriptor <= lastDescriptor,
-                                      "Descriptors were accessed out of order: 
%d was accessed after %d", descriptor, lastDescriptor);
+                                   "Descriptors were accessed out of order: %d 
was accessed after %d", descriptor, lastDescriptor);
                 Invariants.require(descriptor != lastDescriptor ||
-                                      offset < lastOffset,
-                                      "Offsets within %s were accessed out of 
order: %d was accessed after %s", offset, lastOffset);
+                                   offset < lastOffset,
+                                   "Offsets within %s were accessed out of 
order: %d was accessed after %s", offset, lastOffset);
             }
             lastDescriptor = descriptor;
             lastOffset = offset;
 
-            try (DataInputBuffer in = new DataInputBuffer(record, false))
+            try (DataInputBuffer in = new DataInputBuffer(bytes, false))
             {
-                Version userVersion = 
Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer()));
-                if (key.type == JournalKey.Type.TOPOLOGY_UPDATE)
-                    topologySerializer.deserialize(key, builder, in, 
userVersion);
-                else
-                    serializer.deserialize(key, builder, in, userVersion);
-                if (highestClustering == null) // we iterate highest to lowest
-                    highestClustering = row.clustering().getBufferArray();
+                serializer.deserialize(key, builder, in, userVersion);
             }
         }
 
         @Override
-        protected Row applyToStatic(Row row)
+        UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey 
partitionKey) throws IOException
         {
-            checkState(row.isStatic() && row.isEmpty());
-            return row;
+            PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partitionKey);
+            try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
+            {
+                serializer.reserialize(journalKey, builder, out, userVersion);
+                newVersion.row(highestClustering)
+                          .add("record", out.asNewBuffer())
+                          .add("user_version", userVersion.version);
+            }
+
+            return newVersion.build().unfilteredIterator();
+        }
+    }
+
+    static class AccordCommandRowEntry
+    {
+        final AccordJournal.Builder builder = new AccordJournal.Builder();
+        Row row;
+        boolean modified;
+
+        void init(JournalKey key, Row row, ByteBuffer bytes, Version 
userVersion) throws IOException
+        {
+            this.row = row;
+            this.builder.reset(key);
+            try (DataInputBuffer in = new DataInputBuffer(bytes, false))
+            {
+                builder.deserializeNext(in, userVersion);
+            }
+        }
+
+        void clear()
+        {
+            row = null;
+            modified = false;
+            builder.clear();
+        }
+    }
+
+    static class AccordCommandRowCompactor extends 
AccordRowCompactor<AccordJournal.Builder>
+    {
+        static final Object[] rowTemplate = BTree.build(BulkIterator.of(new 
Object[2]), 2, UpdateFunction.noOp);
+        final long timestamp = ClientState.getTimestamp();
+        final AccordCompactionInfos infos;
+        final AccordAgent agent;
+        final Version userVersion;
+        final ColumnData userVersionCell;
+        final long nowInSec;
+
+        final AccordJournal.Builder mainBuilder = new AccordJournal.Builder();
+        final List<AccordCommandRowEntry> entries = new ArrayList<>();
+        final ArrayDeque<AccordCommandRowEntry> reuseEntries = new 
ArrayDeque<>();
+        AccordCompactionInfo info;
+
+        AccordCommandRowCompactor(AccordCompactionInfos infos, AccordAgent 
agent, Version userVersion, long nowInSec)
+        {
+            super((FlyweightSerializer<Object, AccordJournal.Builder>) 
JournalKey.Type.COMMAND_DIFF.serializer);
+            this.infos = infos;
+            this.agent = agent;
+            this.userVersion = userVersion;
+            this.userVersionCell = 
BufferCell.live(AccordKeyspace.JournalColumns.user_version, timestamp, 
Int32Type.instance.decompose(userVersion.version));
+            this.nowInSec = nowInSec;
+        }
+
+        @Override
+        void reset(JournalKey key)
+        {
+            mainBuilder.reset(key);
+            reuseEntries.addAll(entries);
+            for (int i = 0; i < entries.size() ; ++i)
+                entries.get(i).clear();
+            entries.clear();
+        }
+
+        @Override
+        void collect(JournalKey key, Row row, ByteBuffer bytes, Version 
userVersion) throws IOException
+        {
+            AccordCommandRowEntry e = reuseEntries.pollLast();
+            if (e == null)
+                e = new AccordCommandRowEntry();
+            entries.add(e);
+            e.init(key, row, bytes, userVersion);
+            e.modified |= e.builder.clearSuperseded(false, mainBuilder);
+            mainBuilder.fillInMissingOrCleanup(false, e.builder);
+        }
+
+        @Override
+        UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey 
partitionKey) throws IOException
+        {
+            if (mainBuilder.isEmpty())
+                return null;
+
+            if (info != null && info.commandStoreId != 
journalKey.commandStoreId) info = null;
+            if (info == null) info = infos.get(journalKey.commandStoreId);
+            // TODO (required): should return null only if commandStore has 
been removed
+            if (info == null)
+                return null;
+
+            DurableBefore durableBefore = infos.durableBefore;
+            Cleanup cleanup = mainBuilder.maybeCleanup(false, PARTIAL, 
info.redundantBefore, durableBefore);
+            if (cleanup != NO)
+            {
+                switch (cleanup)
+                {
+                    default: throw new UnhandledEnum(cleanup);
+                    case EXPUNGE:
+                        return null;
+                    case ERASE:
+                        return 
PartitionUpdate.fullPartitionDelete(AccordKeyspace.Journal, partitionKey, 
Long.MAX_VALUE, nowInSec).unfilteredIterator();
+
+                    case TRUNCATE:
+                    case TRUNCATE_WITH_OUTCOME:
+                    case INVALIDATE:
+                    case VESTIGIAL:
+                        if (!entries.isEmpty())
+                        {
+                            {
+                                AccordCommandRowEntry entry = entries.get(0);
+                                entry.modified |= 
entry.builder.addCleanup(false, cleanup);
+                            }
+                            for (int i = 1, size = entries.size() ; i < size ; 
i++)
+                            {
+                                AccordCommandRowEntry entry = entries.get(i);
+                                entry.modified |= entry.builder.cleanup(false, 
cleanup);

Review Comment:
   I think this was a bad justification. The compiler should hoist it, and I 
think it probably is cleaner with an if inside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to