aweisberg commented on code in PR #2982:
URL: https://github.com/apache/cassandra/pull/2982#discussion_r1440775859


##########
src/java/org/apache/cassandra/journal/Flusher.java:
##########
@@ -155,21 +158,23 @@ public void doRun(Interruptible.State state) throws 
InterruptedException
         private void doFlush()
         {
             journal.selectSegmentToFlush(segmentsToFlush);
+            segmentsToFlush.sort(comparing(s -> s.descriptor));
+
             // only schedule onSuccess callbacks for a segment if the 
preceding segments
             // have been fully flushed, to preserve 1:1 mapping between 
record's position
             // in the journal and onSuccess callback scheduling order
             boolean scheduleOnSuccessCallbacks = true;
             try
             {
-                for (ActiveSegment<K> segment : segmentsToFlush)
+                for (ActiveSegment<K, V> segment : segmentsToFlush)
                 {
                     try
                     {
                         scheduleOnSuccessCallbacks = doFlush(segment, 
scheduleOnSuccessCallbacks) && scheduleOnSuccessCallbacks;

Review Comment:
   If an earlier segment is not fully flushed (why?) when will we come back 
later and run the necessary callbacks?
   
   If we didn't fully flush does it imply it will get flushed again in the 
future? Just checking this is handled.



##########
src/java/org/apache/cassandra/net/Message.java:
##########
@@ -194,6 +197,58 @@ public TraceType traceType()
         return header.traceType();
     }
 
+    /*
+     * minimal response context extraction
+     */
+
+    public ResponseContext extractResponseContext()

Review Comment:
   Unused but I assume that is coming?



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -194,6 +195,34 @@ public boolean awaitTermination(long timeout, TimeUnit 
units) throws Interrupted
         return false;
     }
 
+    /**
+     * Read an entry by its address (segment timestamp + offest)
+     *
+     * @return deserialized record if present, null otherwise
+     */
+    public V read(long segmentTimestamp, int offset)

Review Comment:
   Am I crazy or is this unused?



##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -423,14 +421,14 @@ private Runnable saveCommandsForKey(CommandsForKey 
before, CommandsForKey after)
     private Runnable saveTimestampsForKey(TimestampsForKey before, 
TimestampsForKey after)
     {
         Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id, 
before, after, nextSystemTimestampMicros());
-        return null != mutation ? mutation::apply : null;
+        return null != mutation ? mutation::applyUnsafe : null;

Review Comment:
   We need to remember to eventually also have the mutations applied as part of 
transactions be applied unsafe as well and then link segments being discarded 
to flushing.



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));
+            haveWork.release(1);
+        }
+
+        void notifyOfEpoch()
+        {
+            haveWork.release(1);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE, 
NON_DAEMON, SYNCHRONIZED);
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        /* internal reusable buffers used for frame generation */
+        private final ArrayList<PendingRequest> requestBuffer = new 
ArrayList<>();
+        private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+        private final ArrayList<RequestContext> contextsBuffer = new 
ArrayList<>();
+
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
+            if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+                return;
+
+            try
+            {
+                doRun();
+            }
+            finally
+            {
+                requestBuffer.clear();
+                pointersBuffer.clear();
+                contextsBuffer.clear();
+            }
+
+            haveWork.acquire(1);
+        }
+
+        private void doRun()
+        {
+            /*
+             * Deal with delayed requests
+             */
+
+            waitForEpochs.sort(null);
+
+            for (int i = 0; i < waitForEpochs.size(); i++)
+            {
+                long waitForEpoch = waitForEpochs.getLong(i);
+                if (!node.topology().hasEpoch(waitForEpoch))
+                    break;
+                requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+            }
+
+            waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+
+            /*
+             * Deal with regular pending requests
+             */
+
+            PendingRequest request;
+            while (null != (request = unframedRequests.poll()))
+            {
+                long waitForEpoch = request.context.waitForEpoch;
+                if (!node.topology().hasEpoch(waitForEpoch))
+                {
+                    delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
+                    if (!waitForEpochs.containsLong(waitForEpoch))
+                    {
+                        waitForEpochs.addLong(waitForEpoch);
+                        node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+                    }
+                }
+                else
+                {
+                    requestBuffer.add(request);
+                }
+            }
+
+            for (PendingRequest req : requestBuffer)
+            {
+                pointersBuffer.add(req.pointer);
+                contextsBuffer.add(req.context);
+            }
+
+            if (!requestBuffer.isEmpty())
+            {
+                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointersBuffer.toArray(new Pointer[0]));
+                FrameContext context = new 
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+                appendAuxiliaryRecord(frame, context);
+            }
+        }
+
+        private final class PendingRequest
+        {
+            final Pointer pointer;
+            final RequestContext context;
+
+            PendingRequest(Pointer pointer, RequestContext context)
+            {
+                this.pointer = pointer;
+                this.context = context;
+            }
+        }
+    }
+
+    private final class FrameApplicator implements Runnable

Review Comment:
   Description that can reference `FrameAggregator` for most of the details. 
This one is pretty straightforward. It gets the aggregated frames containing 
previously written requests/messages and sorts and "applies" them once that 
part of the journal is flushed.



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));
+            haveWork.release(1);
+        }
+
+        void notifyOfEpoch()
+        {
+            haveWork.release(1);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE, 
NON_DAEMON, SYNCHRONIZED);
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        /* internal reusable buffers used for frame generation */
+        private final ArrayList<PendingRequest> requestBuffer = new 
ArrayList<>();
+        private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+        private final ArrayList<RequestContext> contextsBuffer = new 
ArrayList<>();
+
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
+            if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+                return;
+
+            try
+            {
+                doRun();
+            }
+            finally
+            {
+                requestBuffer.clear();
+                pointersBuffer.clear();
+                contextsBuffer.clear();
+            }
+
+            haveWork.acquire(1);
+        }
+
+        private void doRun()
+        {
+            /*
+             * Deal with delayed requests
+             */
+
+            waitForEpochs.sort(null);
+
+            for (int i = 0; i < waitForEpochs.size(); i++)
+            {
+                long waitForEpoch = waitForEpochs.getLong(i);
+                if (!node.topology().hasEpoch(waitForEpoch))
+                    break;
+                requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+            }
+
+            waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+
+            /*
+             * Deal with regular pending requests
+             */
+
+            PendingRequest request;
+            while (null != (request = unframedRequests.poll()))
+            {
+                long waitForEpoch = request.context.waitForEpoch;
+                if (!node.topology().hasEpoch(waitForEpoch))
+                {
+                    delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
+                    if (!waitForEpochs.containsLong(waitForEpoch))
+                    {
+                        waitForEpochs.addLong(waitForEpoch);
+                        node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+                    }
+                }
+                else
+                {
+                    requestBuffer.add(request);
+                }
+            }
+
+            for (PendingRequest req : requestBuffer)
+            {
+                pointersBuffer.add(req.pointer);
+                contextsBuffer.add(req.context);
+            }
+
+            if (!requestBuffer.isEmpty())
+            {
+                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointersBuffer.toArray(new Pointer[0]));
+                FrameContext context = new 
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+                appendAuxiliaryRecord(frame, context);
+            }
+        }
+
+        private final class PendingRequest
+        {
+            final Pointer pointer;
+            final RequestContext context;
+
+            PendingRequest(Pointer pointer, RequestContext context)
+            {
+                this.pointer = pointer;
+                this.context = context;
+            }
+        }
+    }
+
+    private final class FrameApplicator implements Runnable
+    {
+        /** external SPSC written frame queue */
+        private final SpscLinkedQueue<PendingFrame> newFrames = new 
SpscLinkedQueue<>();
+
+        /* single-thread accessed internal frame buffer */
+        private final ArrayList<PendingFrame> pendingFrames = new 
ArrayList<>();
+
+        /* furthest flushed journal segment + position */
+        private volatile Pointer flushedUntil = null;
+
+        private volatile SequentialExecutorPlus executor;
+
+        /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call 
*/
+        void onWrite(Pointer start, int size, FrameContext context)
+        {
+            newFrames.add(new PendingFrame(start, new Pointer(start.segment, 
start.position + size), context));
+        }
+
+        /* invoked only from Journal Flusher thread (single) */
+        void onFlush(long segment, int position)
+        {
+            flushedUntil = new Pointer(segment, position);
+            executor.submit(this);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().sequential("AccordJournal#FrameApplicator");
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        @Override
+        public void run()
+        {
+            if (newFrames.drain(pendingFrames::add) > 0)
+            {
+                /* order by position in the journal, DESC */
+                pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+            }
+
+            Pointer flushedUntil = this.flushedUntil;
+            for (int i = pendingFrames.size() - 1; i >= 0; i--)
+            {
+                PendingFrame frame = pendingFrames.get(i);
+                if (frame.end.compareTo(flushedUntil) > 0)
+                    break;
+                applyFrame((FrameRecord) cachedRecords.remove(frame.start), 
frame.context);
+                pendingFrames.remove(i);
+            }
+        }
+
+        private void applyFrame(FrameRecord frame, FrameContext context)
+        {
+            Invariants.checkState(frame.pointers.length == 
context.requestContexts.length);
+            for (int i = 0; i < frame.pointers.length; i++)
+                applyRequest(frame.pointers[i], context.requestContexts[i]);
+        }
+
+        private void applyRequest(Pointer pointer, RequestContext context)
+        {
+            Request request = (Request) cachedRecords.remove(pointer);
+            Type type = Type.fromMessageType(request.type());
+
+            if (type.isRemoteRequest())
+            {
+                RemoteRequestContext ctx = (RemoteRequestContext) context;
+                Id from = endpointMapper.mappedId(ctx.from());
+                request.process(node, from, ctx);
+            }
+            else
+            {
+                Invariants.checkState(type.isLocalRequest());
+                LocalRequestContext ctx = (LocalRequestContext) context;
+                //noinspection unchecked,rawtypes
+                ((LocalRequest) request).process(node, ctx.callback);
+            }
+        }
+
+        private final class PendingFrame
+        {
+            final Pointer start;
+            final Pointer end;
+            final FrameContext context;
+
+            PendingFrame(Pointer start, Pointer end, FrameContext context)
+            {
+                this.start = start;
+                this.end = end;
+                this.context = context;
+            }
+        }
+    }
+
+    static final class FrameContext
+    {
+        final RequestContext[] requestContexts;
+
+        FrameContext(RequestContext[] requestContexts)
+        {
+            this.requestContexts = requestContexts;
+        }
+    }
+
     static abstract class AuxiliaryRecord

Review Comment:
   Could be private



##########
src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java:
##########
@@ -52,45 +48,34 @@ public void doVerb(Message<T> message) throws IOException
         // TODO (desired): need a non-blocking way to inform CMS of an unknown 
epoch and add callback to it's receipt
 //        ClusterMetadataService.instance().maybeCatchup(message.epoch());
         logger.debug("Receiving {} from {}", message.payload, message.from());
+        Node.Id fromNodeId = endpointMapper.mappedId(message.from());
         T request = message.payload;
+
         long knownEpoch = request.knownEpoch();
         if (!node.topology().hasEpoch(knownEpoch))
         {
             node.configService().fetchTopologyForEpoch(knownEpoch);
             long waitForEpoch = request.waitForEpoch();

Review Comment:
   Request really needs to document the difference between `knownEpoch` and 
`waitForEpoch`.



##########
src/java/org/apache/cassandra/net/Message.java:
##########
@@ -71,7 +70,7 @@
  *
  * @param <T> The type of the message payload.
  */
-public class Message<T> implements ReplyContext
+public class Message<T> implements ResponseContext

Review Comment:
   It's getting a little weird that `ResponseContext` and `ReplyContext` are 
basically the same thing, but one is accord and one is cassandra. Might be 
better as `CassandraResponseContext` and `AccordResponseContext` or 
`CassandraResponseContext` and `AccordResponseContext`



##########
src/java/org/apache/cassandra/journal/Flusher.java:
##########
@@ -181,11 +186,11 @@ private void doFlush()
         }
 
         // flush the segment, schedule write callbacks if requested, return 
whether the segment has been flushed fully
-        private boolean doFlush(ActiveSegment<K> segment, boolean 
scheduleCallbacks)
+        private boolean doFlush(ActiveSegment<K, V> segment, boolean 
scheduleCallbacks)

Review Comment:
   Maybe be consistent with `scheduleCallbacks` and 
`scheduleOnSuccessCallbacks`. If `callbacks` contains the `SuccessCallbacks` 
maybe it should match.
   
   I don't mind the wordiness since we have a lot of callbacks flying around 
and knowing which one you are looking at makes it more readable.



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -665,10 +698,10 @@ void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K> 
activeSegment)
      */
     public void replayStaticSegments(RecordConsumer<K> consumer)
     {
-        List<StaticSegment<K>> staticSegments = new 
ArrayList<>(segments().onlyStatic());
-        staticSegments.sort(comparing(segment -> segment.descriptor));
-
-        for (StaticSegment<K> segment : staticSegments)
+        ArrayList<StaticSegment<K, V>> staticSegments = new ArrayList<>();

Review Comment:
   Why switch to `ArrayList`?



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));
+            haveWork.release(1);
+        }
+
+        void notifyOfEpoch()
+        {
+            haveWork.release(1);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE, 
NON_DAEMON, SYNCHRONIZED);
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        /* internal reusable buffers used for frame generation */
+        private final ArrayList<PendingRequest> requestBuffer = new 
ArrayList<>();
+        private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+        private final ArrayList<RequestContext> contextsBuffer = new 
ArrayList<>();
+
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
+            if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+                return;
+
+            try
+            {
+                doRun();
+            }
+            finally
+            {
+                requestBuffer.clear();
+                pointersBuffer.clear();
+                contextsBuffer.clear();
+            }
+
+            haveWork.acquire(1);
+        }
+
+        private void doRun()
+        {
+            /*
+             * Deal with delayed requests
+             */
+
+            waitForEpochs.sort(null);
+
+            for (int i = 0; i < waitForEpochs.size(); i++)
+            {
+                long waitForEpoch = waitForEpochs.getLong(i);
+                if (!node.topology().hasEpoch(waitForEpoch))
+                    break;
+                requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+            }
+
+            waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+
+            /*
+             * Deal with regular pending requests
+             */
+
+            PendingRequest request;
+            while (null != (request = unframedRequests.poll()))
+            {
+                long waitForEpoch = request.context.waitForEpoch;
+                if (!node.topology().hasEpoch(waitForEpoch))
+                {
+                    delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
+                    if (!waitForEpochs.containsLong(waitForEpoch))
+                    {
+                        waitForEpochs.addLong(waitForEpoch);
+                        node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+                    }
+                }
+                else
+                {
+                    requestBuffer.add(request);
+                }
+            }
+
+            for (PendingRequest req : requestBuffer)
+            {
+                pointersBuffer.add(req.pointer);
+                contextsBuffer.add(req.context);
+            }
+
+            if (!requestBuffer.isEmpty())
+            {
+                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointersBuffer.toArray(new Pointer[0]));
+                FrameContext context = new 
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+                appendAuxiliaryRecord(frame, context);
+            }
+        }
+
+        private final class PendingRequest
+        {
+            final Pointer pointer;
+            final RequestContext context;
+
+            PendingRequest(Pointer pointer, RequestContext context)
+            {
+                this.pointer = pointer;
+                this.context = context;
+            }
+        }
+    }
+
+    private final class FrameApplicator implements Runnable
+    {
+        /** external SPSC written frame queue */
+        private final SpscLinkedQueue<PendingFrame> newFrames = new 
SpscLinkedQueue<>();
+
+        /* single-thread accessed internal frame buffer */
+        private final ArrayList<PendingFrame> pendingFrames = new 
ArrayList<>();
+
+        /* furthest flushed journal segment + position */
+        private volatile Pointer flushedUntil = null;
+
+        private volatile SequentialExecutorPlus executor;
+
+        /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call 
*/
+        void onWrite(Pointer start, int size, FrameContext context)
+        {
+            newFrames.add(new PendingFrame(start, new Pointer(start.segment, 
start.position + size), context));
+        }
+
+        /* invoked only from Journal Flusher thread (single) */
+        void onFlush(long segment, int position)
+        {
+            flushedUntil = new Pointer(segment, position);
+            executor.submit(this);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().sequential("AccordJournal#FrameApplicator");
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        @Override
+        public void run()
+        {
+            if (newFrames.drain(pendingFrames::add) > 0)
+            {
+                /* order by position in the journal, DESC */
+                pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+            }
+
+            Pointer flushedUntil = this.flushedUntil;
+            for (int i = pendingFrames.size() - 1; i >= 0; i--)
+            {
+                PendingFrame frame = pendingFrames.get(i);
+                if (frame.end.compareTo(flushedUntil) > 0)
+                    break;
+                applyFrame((FrameRecord) cachedRecords.remove(frame.start), 
frame.context);
+                pendingFrames.remove(i);
+            }
+        }
+
+        private void applyFrame(FrameRecord frame, FrameContext context)
+        {
+            Invariants.checkState(frame.pointers.length == 
context.requestContexts.length);
+            for (int i = 0; i < frame.pointers.length; i++)
+                applyRequest(frame.pointers[i], context.requestContexts[i]);
+        }
+
+        private void applyRequest(Pointer pointer, RequestContext context)
+        {
+            Request request = (Request) cachedRecords.remove(pointer);
+            Type type = Type.fromMessageType(request.type());
+
+            if (type.isRemoteRequest())
+            {
+                RemoteRequestContext ctx = (RemoteRequestContext) context;
+                Id from = endpointMapper.mappedId(ctx.from());
+                request.process(node, from, ctx);
+            }
+            else
+            {
+                Invariants.checkState(type.isLocalRequest());
+                LocalRequestContext ctx = (LocalRequestContext) context;
+                //noinspection unchecked,rawtypes
+                ((LocalRequest) request).process(node, ctx.callback);
+            }
+        }
+
+        private final class PendingFrame

Review Comment:
   Comment, frame containing pointers to all the requests in the journal 
contained by the frame that have been written to the journal, but have not been 
process by the frame applicaticator yet. Will be processed by the frame 
applicator once the journal has flushed the frame.



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -211,21 +254,28 @@ public Object shutdownNow()
     @Override
     public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
     {
-        // TODO (expected)
+        // TODO (expected, other)
         return true;
     }
 
-    void appendAuxiliaryRecord(AuxiliaryRecord record)
+    void appendAuxiliaryRecord(AuxiliaryRecord record, Object context)

Review Comment:
   One sentence description of what an auxiliary record is that is being 
appended?



##########
src/java/org/apache/cassandra/journal/Segments.java:
##########
@@ -17,123 +17,92 @@
  */
 package org.apache.cassandra.journal;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 
 import accord.utils.Invariants;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.Refs;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.emptyMap;
-
 /**
  * Consistent, immutable view of active + static segments
  * <p/>
- * TODO: an interval/range structure for StaticSegment lookup based on min/max 
key bounds
+ * TODO (performance, expected): an interval/range structure for StaticSegment 
lookup based on min/max key bounds
  */
-class Segments<K>
+class Segments<K, V>
 {
-    // active segments, containing unflushed data; the tail of this queue is 
the one we allocate writes from
-    private final List<ActiveSegment<K>> activeSegments;
-
-    // finalised segments, no longer written to
-    private final Map<Descriptor, StaticSegment<K>> staticSegments;
+    private final Long2ObjectHashMap<Segment<K, V>> segments;
 
-    // cached Iterable of concatenated active and static segments
-    private final Iterable<Segment<K>> allSegments;
-
-    Segments(List<ActiveSegment<K>> activeSegments, Map<Descriptor, 
StaticSegment<K>> staticSegments)
+    Segments(Long2ObjectHashMap<Segment<K, V>> segments)
     {
-        this.activeSegments = activeSegments;
-        this.staticSegments = staticSegments;
-        this.allSegments = Iterables.concat(onlyActive(), onlyStatic());
+        this.segments = segments;
     }
 
-    static <K> Segments<K> ofStatic(Collection<StaticSegment<K>> segments)
+    static <K, V> Segments<K, V> of(Collection<Segment<K, V>> segments)
     {
-        HashMap<Descriptor, StaticSegment<K>> staticSegments =
-            Maps.newHashMapWithExpectedSize(segments.size());
-        for (StaticSegment<K> segment : segments)
-            staticSegments.put(segment.descriptor, segment);
-        return new Segments<>(new ArrayList<>(), staticSegments);
+        Long2ObjectHashMap<Segment<K, V>> newSegments = 
newMap(segments.size());
+        for (Segment<K, V> segment : segments)
+            newSegments.put(segment.descriptor.timestamp, segment);
+        return new Segments<>(newSegments);
     }
 
-    static <K> Segments<K> none()
+    static <K, V> Segments<K, V> none()
     {
-        return new Segments<>(Collections.emptyList(), Collections.emptyMap());
+        return new Segments<>(emptyMap());
     }
 
-    Segments<K> withNewActiveSegment(ActiveSegment<K> activeSegment)
+    Segments<K, V> withNewActiveSegment(ActiveSegment<K, V> activeSegment)
     {
-        ArrayList<ActiveSegment<K>> newActiveSegments =
-            new ArrayList<>(activeSegments.size() + 1);
-        newActiveSegments.addAll(activeSegments);
-        newActiveSegments.add(activeSegment);
-        return new Segments<>(newActiveSegments, staticSegments);
+        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
+        Segment<K, V> oldValue = 
newSegments.put(activeSegment.descriptor.timestamp, activeSegment);
+        Invariants.checkState(oldValue == null);
+        return new Segments<>(newSegments);
     }
 
-    Segments<K> withCompletedSegment(ActiveSegment<K> activeSegment, 
StaticSegment<K> staticSegment)
+    Segments<K, V> withCompletedSegment(ActiveSegment<K, V> activeSegment, 
StaticSegment<K, V> staticSegment)
     {
         
Invariants.checkArgument(activeSegment.descriptor.equals(staticSegment.descriptor));
-
-        ArrayList<ActiveSegment<K>> newActiveSegments =
-            new ArrayList<>(activeSegments.size() - 1);
-        for (ActiveSegment<K> segment : activeSegments)
-            if (segment != activeSegment)
-                newActiveSegments.add(segment);
-        Invariants.checkState(newActiveSegments.size() == 
activeSegments.size() - 1);
-
-        HashMap<Descriptor, StaticSegment<K>> newStaticSegments =
-            Maps.newHashMapWithExpectedSize(staticSegments.size() + 1);
-        newStaticSegments.putAll(staticSegments);
-        if (newStaticSegments.put(staticSegment.descriptor, staticSegment) != 
null)
-            throw new IllegalStateException();
-
-        return new Segments<>(newActiveSegments, newStaticSegments);
+        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
+        Segment<K, V> oldValue = 
newSegments.put(staticSegment.descriptor.timestamp, staticSegment);
+        Invariants.checkState(oldValue == activeSegment);
+        return new Segments<>(newSegments);
     }
 
-    Segments<K> withCompactedSegment(StaticSegment<K> oldSegment, 
StaticSegment<K> newSegment)
+    Segments<K, V> withCompactedSegment(StaticSegment<K, V> oldSegment, 
StaticSegment<K, V> newSegment)
     {
         Invariants.checkArgument(oldSegment.descriptor.timestamp == 
newSegment.descriptor.timestamp);
         Invariants.checkArgument(oldSegment.descriptor.generation < 
newSegment.descriptor.generation);
-
-        HashMap<Descriptor, StaticSegment<K>> newStaticSegments = new 
HashMap<>(staticSegments);
-        if (!newStaticSegments.remove(oldSegment.descriptor, oldSegment))
-            throw new IllegalStateException();
-        if (null != newStaticSegments.put(newSegment.descriptor, newSegment))
-            throw new IllegalStateException();
-
-        return new Segments<>(activeSegments, newStaticSegments);
+        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
+        Segment<K, V> oldValue = 
newSegments.put(newSegment.descriptor.timestamp, newSegment);
+        Invariants.checkState(oldValue == oldSegment);
+        return new Segments<>(newSegments);
     }
 
-    Segments<K> withoutInvalidatedSegment(StaticSegment<K> staticSegment)
+    Segments<K, V> withoutInvalidatedSegment(StaticSegment<K, V> staticSegment)
     {
-        HashMap<Descriptor, StaticSegment<K>> newStaticSegments = new 
HashMap<>(staticSegments);
-        if (!newStaticSegments.remove(staticSegment.descriptor, staticSegment))
+        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
+        if (!newSegments.remove(staticSegment.descriptor.timestamp, 
staticSegment))
             throw new IllegalStateException();
-        return new Segments<>(activeSegments, newStaticSegments);
+        return new Segments<>(newSegments);
     }
 
-    Iterable<Segment<K>> all()
+    Iterable<Segment<K, V>> all()
     {
-        return allSegments;
+        return segments.values();
     }
 
-    Collection<ActiveSegment<K>> onlyActive()
+    void selectActive(long maxTimestamp, Collection<ActiveSegment<K, V>> into)

Review Comment:
   Could be an iterables transform and filter, but sure this is probably 
slightly faster.



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -211,21 +254,28 @@ public Object shutdownNow()
     @Override
     public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
     {
-        // TODO (expected)
+        // TODO (expected, other)
         return true;
     }
 
-    void appendAuxiliaryRecord(AuxiliaryRecord record)
+    void appendAuxiliaryRecord(AuxiliaryRecord record, Object context)
     {
         Key key = new Key(record.timestamp, record.type());
-        journal.write(key, record, SENTINEL_HOSTS);
+        journal.asyncWrite(key, record, SENTINEL_HOSTS, context);
     }
 
-    public void appendMessage(Message message, Executor executor, 
AsyncWriteCallback callback)
+    public void appendRemoteRequest(Request request, ResponseContext context)

Review Comment:
   Same as above



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));
+            haveWork.release(1);
+        }
+
+        void notifyOfEpoch()
+        {
+            haveWork.release(1);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE, 
NON_DAEMON, SYNCHRONIZED);
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        /* internal reusable buffers used for frame generation */
+        private final ArrayList<PendingRequest> requestBuffer = new 
ArrayList<>();
+        private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+        private final ArrayList<RequestContext> contextsBuffer = new 
ArrayList<>();
+
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
+            if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+                return;
+
+            try
+            {
+                doRun();
+            }
+            finally
+            {
+                requestBuffer.clear();
+                pointersBuffer.clear();
+                contextsBuffer.clear();
+            }
+
+            haveWork.acquire(1);
+        }
+
+        private void doRun()
+        {
+            /*
+             * Deal with delayed requests
+             */
+
+            waitForEpochs.sort(null);
+
+            for (int i = 0; i < waitForEpochs.size(); i++)
+            {
+                long waitForEpoch = waitForEpochs.getLong(i);
+                if (!node.topology().hasEpoch(waitForEpoch))
+                    break;
+                requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+            }
+
+            waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+
+            /*
+             * Deal with regular pending requests
+             */
+
+            PendingRequest request;
+            while (null != (request = unframedRequests.poll()))
+            {
+                long waitForEpoch = request.context.waitForEpoch;
+                if (!node.topology().hasEpoch(waitForEpoch))
+                {
+                    delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
+                    if (!waitForEpochs.containsLong(waitForEpoch))
+                    {
+                        waitForEpochs.addLong(waitForEpoch);
+                        node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+                    }
+                }
+                else
+                {
+                    requestBuffer.add(request);
+                }
+            }
+
+            for (PendingRequest req : requestBuffer)
+            {
+                pointersBuffer.add(req.pointer);
+                contextsBuffer.add(req.context);
+            }
+
+            if (!requestBuffer.isEmpty())
+            {
+                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointersBuffer.toArray(new Pointer[0]));

Review Comment:
   You could avoid converting to an array which is a copy, and not sure if Java 
escapes out the new empty array.
   
   When deserializing you can do `Arrays.asList` which is strictly cheaper than 
always having to do the copy.



##########
src/java/org/apache/cassandra/journal/Flusher.java:
##########
@@ -181,11 +186,11 @@ private void doFlush()
         }
 
         // flush the segment, schedule write callbacks if requested, return 
whether the segment has been flushed fully
-        private boolean doFlush(ActiveSegment<K> segment, boolean 
scheduleCallbacks)
+        private boolean doFlush(ActiveSegment<K, V> segment, boolean 
scheduleCallbacks)
         {
             int syncedOffset = segment.flush();
             if (scheduleCallbacks)
-                segment.scheduleOnSuccessCallbacks(syncedOffset);
+                callbacks.onFlush(segment.descriptor.timestamp, syncedOffset);
             return segment.isFullyFlushed(syncedOffset);

Review Comment:
   Add a quick comment with the definition of `FullyFlushed` to the 
`isFullyFlushed` method since how `endOfBuffer` works.
   
   It seems like `endOfBuffer` is detecting the case where you see the next 
segment, but don't actually flush all of the entries appended to the previous 
segment so it's not safe to invoke callbacks for the next segment since you 
skipped some from the previous segment?
   
   If we don't fully flush, should we flush the next segment if we can't invoke 
the callbacks for it anyways? Maybe break out of the loop and we can flush 
everything next time around?



##########
src/java/org/apache/cassandra/journal/AsyncCallbacks.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+/**
+ * TODO (expected): document, implement

Review Comment:
   These methods are 90% obvious. Maybe define what flush is, and then 
contextually at what point each is called.
   
   Could probably be summarized as `onWrite` is called after all in memory and 
file contents are updated, but before the write is flushed.
   
   `onWriteFailed` could be anywhere from serialization to writing to the file, 
to requesting the flush.
   
   `onFlush` seems to imply the segment is flushed, but doesn't imply that the 
success callbacks will or have been run. Not sure it's relevant to mention in 
this interface.
   
   



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -211,21 +254,28 @@ public Object shutdownNow()
     @Override
     public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
     {
-        // TODO (expected)
+        // TODO (expected, other)
         return true;
     }
 
-    void appendAuxiliaryRecord(AuxiliaryRecord record)
+    void appendAuxiliaryRecord(AuxiliaryRecord record, Object context)
     {
         Key key = new Key(record.timestamp, record.type());
-        journal.write(key, record, SENTINEL_HOSTS);
+        journal.asyncWrite(key, record, SENTINEL_HOSTS, context);
     }
 
-    public void appendMessage(Message message, Executor executor, 
AsyncWriteCallback callback)
+    public void appendRemoteRequest(Request request, ResponseContext context)
     {
-        Type type = Type.fromMessageType(message.type());
-        Key key = new Key(type.txnId(message), type);
-        journal.asyncWrite(key, message, SENTINEL_HOSTS, executor, callback);
+        Type type = Type.fromMessageType(request.type());
+        Key key = new Key(type.txnId(request), type);
+        journal.asyncWrite(key, request, SENTINEL_HOSTS, context);
+    }
+
+    public void appendLocalRequest(LocalRequest<?> request)

Review Comment:
   Same as above



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));
+            haveWork.release(1);
+        }
+
+        void notifyOfEpoch()
+        {
+            haveWork.release(1);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE, 
NON_DAEMON, SYNCHRONIZED);
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        /* internal reusable buffers used for frame generation */
+        private final ArrayList<PendingRequest> requestBuffer = new 
ArrayList<>();
+        private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+        private final ArrayList<RequestContext> contextsBuffer = new 
ArrayList<>();
+
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
+            if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+                return;
+
+            try
+            {
+                doRun();
+            }
+            finally
+            {
+                requestBuffer.clear();
+                pointersBuffer.clear();
+                contextsBuffer.clear();
+            }
+
+            haveWork.acquire(1);
+        }
+
+        private void doRun()
+        {
+            /*
+             * Deal with delayed requests
+             */
+
+            waitForEpochs.sort(null);
+
+            for (int i = 0; i < waitForEpochs.size(); i++)
+            {
+                long waitForEpoch = waitForEpochs.getLong(i);
+                if (!node.topology().hasEpoch(waitForEpoch))
+                    break;
+                requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+            }
+
+            waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+
+            /*
+             * Deal with regular pending requests
+             */
+
+            PendingRequest request;
+            while (null != (request = unframedRequests.poll()))
+            {
+                long waitForEpoch = request.context.waitForEpoch;
+                if (!node.topology().hasEpoch(waitForEpoch))
+                {
+                    delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
+                    if (!waitForEpochs.containsLong(waitForEpoch))
+                    {
+                        waitForEpochs.addLong(waitForEpoch);
+                        node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+                    }
+                }
+                else
+                {
+                    requestBuffer.add(request);
+                }
+            }
+
+            for (PendingRequest req : requestBuffer)
+            {
+                pointersBuffer.add(req.pointer);
+                contextsBuffer.add(req.context);
+            }
+
+            if (!requestBuffer.isEmpty())
+            {
+                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointersBuffer.toArray(new Pointer[0]));
+                FrameContext context = new 
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+                appendAuxiliaryRecord(frame, context);
+            }
+        }
+
+        private final class PendingRequest
+        {
+            final Pointer pointer;
+            final RequestContext context;
+
+            PendingRequest(Pointer pointer, RequestContext context)
+            {
+                this.pointer = pointer;
+                this.context = context;
+            }
+        }
+    }
+
+    private final class FrameApplicator implements Runnable
+    {
+        /** external SPSC written frame queue */
+        private final SpscLinkedQueue<PendingFrame> newFrames = new 
SpscLinkedQueue<>();
+
+        /* single-thread accessed internal frame buffer */
+        private final ArrayList<PendingFrame> pendingFrames = new 
ArrayList<>();
+
+        /* furthest flushed journal segment + position */
+        private volatile Pointer flushedUntil = null;
+
+        private volatile SequentialExecutorPlus executor;
+
+        /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call 
*/
+        void onWrite(Pointer start, int size, FrameContext context)
+        {
+            newFrames.add(new PendingFrame(start, new Pointer(start.segment, 
start.position + size), context));
+        }
+
+        /* invoked only from Journal Flusher thread (single) */
+        void onFlush(long segment, int position)
+        {
+            flushedUntil = new Pointer(segment, position);
+            executor.submit(this);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().sequential("AccordJournal#FrameApplicator");
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        @Override
+        public void run()
+        {
+            if (newFrames.drain(pendingFrames::add) > 0)
+            {
+                /* order by position in the journal, DESC */
+                pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+            }
+
+            Pointer flushedUntil = this.flushedUntil;
+            for (int i = pendingFrames.size() - 1; i >= 0; i--)
+            {
+                PendingFrame frame = pendingFrames.get(i);
+                if (frame.end.compareTo(flushedUntil) > 0)
+                    break;
+                applyFrame((FrameRecord) cachedRecords.remove(frame.start), 
frame.context);
+                pendingFrames.remove(i);
+            }
+        }
+
+        private void applyFrame(FrameRecord frame, FrameContext context)
+        {
+            Invariants.checkState(frame.pointers.length == 
context.requestContexts.length);
+            for (int i = 0; i < frame.pointers.length; i++)
+                applyRequest(frame.pointers[i], context.requestContexts[i]);
+        }
+
+        private void applyRequest(Pointer pointer, RequestContext context)
+        {
+            Request request = (Request) cachedRecords.remove(pointer);

Review Comment:
   So right now this cache isn't really a cache, but in the future it will read 
the message back off of disk if necessary?



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task

Review Comment:
   Describe what a frame is, how/why we create them, and what we do with them 
once created and after they are written, and after they are flushed



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));
+            haveWork.release(1);
+        }
+
+        void notifyOfEpoch()
+        {
+            haveWork.release(1);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE, 
NON_DAEMON, SYNCHRONIZED);
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        /* internal reusable buffers used for frame generation */
+        private final ArrayList<PendingRequest> requestBuffer = new 
ArrayList<>();
+        private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+        private final ArrayList<RequestContext> contextsBuffer = new 
ArrayList<>();
+
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
+            if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+                return;
+
+            try
+            {
+                doRun();
+            }
+            finally
+            {
+                requestBuffer.clear();
+                pointersBuffer.clear();
+                contextsBuffer.clear();
+            }
+
+            haveWork.acquire(1);
+        }
+
+        private void doRun()
+        {
+            /*
+             * Deal with delayed requests
+             */
+
+            waitForEpochs.sort(null);
+
+            for (int i = 0; i < waitForEpochs.size(); i++)
+            {
+                long waitForEpoch = waitForEpochs.getLong(i);
+                if (!node.topology().hasEpoch(waitForEpoch))
+                    break;
+                requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+            }
+
+            waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+
+            /*
+             * Deal with regular pending requests
+             */
+
+            PendingRequest request;
+            while (null != (request = unframedRequests.poll()))
+            {
+                long waitForEpoch = request.context.waitForEpoch;
+                if (!node.topology().hasEpoch(waitForEpoch))
+                {
+                    delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
+                    if (!waitForEpochs.containsLong(waitForEpoch))

Review Comment:
   A set would resist long searches through large numbers of epochs. If that 
happens maybe we have bigger problems.



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));
+            haveWork.release(1);
+        }
+
+        void notifyOfEpoch()
+        {
+            haveWork.release(1);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE, 
NON_DAEMON, SYNCHRONIZED);
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        /* internal reusable buffers used for frame generation */
+        private final ArrayList<PendingRequest> requestBuffer = new 
ArrayList<>();
+        private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+        private final ArrayList<RequestContext> contextsBuffer = new 
ArrayList<>();
+
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
+            if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+                return;
+
+            try
+            {
+                doRun();
+            }
+            finally
+            {
+                requestBuffer.clear();
+                pointersBuffer.clear();
+                contextsBuffer.clear();
+            }
+
+            haveWork.acquire(1);
+        }
+
+        private void doRun()
+        {
+            /*
+             * Deal with delayed requests
+             */
+
+            waitForEpochs.sort(null);
+
+            for (int i = 0; i < waitForEpochs.size(); i++)
+            {
+                long waitForEpoch = waitForEpochs.getLong(i);
+                if (!node.topology().hasEpoch(waitForEpoch))
+                    break;
+                requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+            }
+
+            waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
+
+            /*
+             * Deal with regular pending requests
+             */
+
+            PendingRequest request;
+            while (null != (request = unframedRequests.poll()))
+            {
+                long waitForEpoch = request.context.waitForEpoch;
+                if (!node.topology().hasEpoch(waitForEpoch))
+                {
+                    delayedRequests.computeIfAbsent(waitForEpoch, ignore -> 
new ArrayList<>()).add(request);
+                    if (!waitForEpochs.containsLong(waitForEpoch))
+                    {
+                        waitForEpochs.addLong(waitForEpoch);
+                        node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+                    }
+                }
+                else
+                {
+                    requestBuffer.add(request);
+                }
+            }
+
+            for (PendingRequest req : requestBuffer)
+            {
+                pointersBuffer.add(req.pointer);
+                contextsBuffer.add(req.context);
+            }
+
+            if (!requestBuffer.isEmpty())
+            {
+                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointersBuffer.toArray(new Pointer[0]));
+                FrameContext context = new 
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+                appendAuxiliaryRecord(frame, context);
+            }
+        }
+
+        private final class PendingRequest
+        {
+            final Pointer pointer;
+            final RequestContext context;
+
+            PendingRequest(Pointer pointer, RequestContext context)
+            {
+                this.pointer = pointer;
+                this.context = context;
+            }
+        }
+    }
+
+    private final class FrameApplicator implements Runnable
+    {
+        /** external SPSC written frame queue */
+        private final SpscLinkedQueue<PendingFrame> newFrames = new 
SpscLinkedQueue<>();
+
+        /* single-thread accessed internal frame buffer */
+        private final ArrayList<PendingFrame> pendingFrames = new 
ArrayList<>();
+
+        /* furthest flushed journal segment + position */
+        private volatile Pointer flushedUntil = null;
+
+        private volatile SequentialExecutorPlus executor;
+
+        /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call 
*/
+        void onWrite(Pointer start, int size, FrameContext context)
+        {
+            newFrames.add(new PendingFrame(start, new Pointer(start.segment, 
start.position + size), context));
+        }
+
+        /* invoked only from Journal Flusher thread (single) */
+        void onFlush(long segment, int position)
+        {
+            flushedUntil = new Pointer(segment, position);
+            executor.submit(this);
+        }
+
+        void start()
+        {
+            executor = 
executorFactory().sequential("AccordJournal#FrameApplicator");
+        }
+
+        void shutdown()
+        {
+            executor.shutdown();
+        }
+
+        @Override
+        public void run()
+        {
+            if (newFrames.drain(pendingFrames::add) > 0)
+            {
+                /* order by position in the journal, DESC */
+                pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+            }
+
+            Pointer flushedUntil = this.flushedUntil;
+            for (int i = pendingFrames.size() - 1; i >= 0; i--)
+            {
+                PendingFrame frame = pendingFrames.get(i);
+                if (frame.end.compareTo(flushedUntil) > 0)
+                    break;
+                applyFrame((FrameRecord) cachedRecords.remove(frame.start), 
frame.context);
+                pendingFrames.remove(i);
+            }
+        }
+
+        private void applyFrame(FrameRecord frame, FrameContext context)
+        {
+            Invariants.checkState(frame.pointers.length == 
context.requestContexts.length);
+            for (int i = 0; i < frame.pointers.length; i++)
+                applyRequest(frame.pointers[i], context.requestContexts[i]);
+        }
+
+        private void applyRequest(Pointer pointer, RequestContext context)
+        {
+            Request request = (Request) cachedRecords.remove(pointer);
+            Type type = Type.fromMessageType(request.type());
+
+            if (type.isRemoteRequest())
+            {
+                RemoteRequestContext ctx = (RemoteRequestContext) context;
+                Id from = endpointMapper.mappedId(ctx.from());
+                request.process(node, from, ctx);
+            }
+            else
+            {
+                Invariants.checkState(type.isLocalRequest());
+                LocalRequestContext ctx = (LocalRequestContext) context;
+                //noinspection unchecked,rawtypes
+                ((LocalRequest) request).process(node, ctx.callback);
+            }
+        }
+
+        private final class PendingFrame
+        {
+            final Pointer start;
+            final Pointer end;
+            final FrameContext context;
+
+            PendingFrame(Pointer start, Pointer end, FrameContext context)
+            {
+                this.start = start;
+                this.end = end;
+                this.context = context;
+            }
+        }
+    }
+
+    static final class FrameContext

Review Comment:
   Could be private



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
         }
     }
 
+    /*
+     * Record framing logic
+     */
+
+    private final class FrameAggregator implements Interruptible.Task
+    {
+        /* external MPSC pending request queue */
+        private final ManyToOneConcurrentLinkedQueue<PendingRequest> 
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+        private final LongArrayList waitForEpochs = new LongArrayList();
+        private final Long2ObjectHashMap<ArrayList<PendingRequest>> 
delayedRequests = new Long2ObjectHashMap<>();
+
+        private volatile Interruptible executor;
+
+        // a signal and flag that callers outside the aggregator thread can use
+        // to signal they want the aggregator to run again
+        private final Semaphore haveWork = newSemaphore(1);
+
+        void onWrite(Pointer pointer, RequestContext context)
+        {
+            unframedRequests.add(new PendingRequest(pointer, context));

Review Comment:
   `RequestContext` could provide storage for `Pointer` instead of allocating a 
new object. A step further would be to store pointers in a columnar fashion so 
you don't have pointer objects, but that really depends on how hard you want to 
go at this. 



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -711,43 +1255,57 @@ static abstract class AuxiliaryRecord
         abstract Type type();
     }
 
-    /*
-     * Placeholder for future record.
-     */
-    static final class ReplayRecord extends AuxiliaryRecord
+    static final class FrameRecord extends AuxiliaryRecord
     {
-        ReplayRecord(Timestamp timestamp)
+        final Pointer[] pointers;
+
+        FrameRecord(Timestamp timestamp, Pointer[] pointers)
         {
             super(timestamp);
+            this.pointers = pointers;
         }
 
         @Override
         Type type()
         {
-            return Type.REPLAY;
+            return Type.FRAME;
         }
 
-        static final ValueSerializer<Key, ReplayRecord> SERIALIZER = new 
ValueSerializer<Key, ReplayRecord>()
+        static final ValueSerializer<Key, FrameRecord> SERIALIZER = new 
ValueSerializer<>()
         {
             @Override
-            public int serializedSize(Key key, ReplayRecord record, int 
userVersion)
+            public int serializedSize(Key key, FrameRecord frame, int 
userVersion)
             {
-                return 0;
+                int size = computeUnsignedVIntSize(frame.pointers.length);
+                for (Pointer pointer : frame.pointers)
+                    size += pointer.serializedSize();
+                return size;
             }
 
             @Override
-            public void serialize(Key key, ReplayRecord record, DataOutputPlus 
out, int userVersion)
+            public void serialize(Key key, FrameRecord frame, DataOutputPlus 
out, int userVersion) throws IOException
             {
+                out.writeUnsignedVInt32(frame.pointers.length);
+                for (Pointer pointer : frame.pointers)
+                    pointer.serialize(out);
             }
 
             @Override
-            public ReplayRecord deserialize(Key key, DataInputPlus in, int 
userVersion)
+            public FrameRecord deserialize(Key key, DataInputPlus in, int 
userVersion) throws IOException
             {
-                return new ReplayRecord(key.timestamp);
+                int count = in.readUnsignedVInt32();
+                Pointer[] pointers = new Pointer[count];
+                for (int i = 0; i < count; i++)
+                    pointers[i] = Pointer.deserialize(in);
+                return new FrameRecord(key.timestamp, pointers);
             }
         };
     }
 
+    /*
+     * Message provider implementation
+     */
+

Review Comment:
   Extra line break?



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -237,17 +287,241 @@ public void appendMessageBlocking(Message message)
     }
 
     @VisibleForTesting
-    public <M extends Message> M readMessage(TxnId txnId, Type type, Class<M> 
clazz)
+    public <M extends Message> M readMessage(TxnId txnId, MessageType 
messageType, Class<M> clazz)
+    {
+        for (Type type : Type.synonymousTypesFromMessageType(messageType))
+        {
+            M message = clazz.cast(journal.readFirst(new Key(txnId, type)));
+            if (null != message) return message;
+        }
+        return null;
+    }
+
+    private <M extends Message> M readMessage(TxnId txnId, MessageType 
messageType, Class<M> clazz, Predicate<Object> condition)
+    {
+        for (Type type : Type.synonymousTypesFromMessageType(messageType))
+        {
+            M message = clazz.cast(journal.readFirstMatching(new Key(txnId, 
type), condition));
+            if (null != message) return message;
+        }
+        return null;
+    }
+
+    private static class Pointer implements Comparable<Pointer>
+    {
+        final long segment; // unique segment id
+        final int position; // record start position within the segment
+
+        Pointer(long segment, int position)
+        {
+            this.segment = segment;
+            this.position = position;
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (this == other)
+                return true;
+            if (!(other instanceof Pointer))
+                return false;
+            Pointer that = (Pointer) other;
+            return this.segment == that.segment
+                && this.position == that.position;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Long.hashCode(segment) + position * 31;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "(" + segment + ", " + position + ')';
+        }
+
+        @Override
+        public int compareTo(Pointer that)
+        {
+            int cmp = Longs.compare(this.segment, that.segment);
+            return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
+        }
+
+        int serializedSize()
+        {
+            return computeUnsignedVIntSize(segment) + 
computeUnsignedVIntSize(position);
+        }
+
+        void serialize(DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt(segment);
+            out.writeUnsignedVInt32(position);
+        }
+
+        static Pointer deserialize(DataInputPlus in) throws IOException
+        {
+            long segment = in.readUnsignedVInt();
+            int position = in.readUnsignedVInt32();
+            return new Pointer(segment, position);
+        }
+    }
+
+    private class JournalCallbacks implements AsyncCallbacks<Key, Object>
+    {
+        @Override
+        public void onWrite(long segment, int position, int size, Key key, 
Object value, Object writeContext)

Review Comment:
   One sentence comment, route the completion of writes to the next step which 
is either frame aggregation or frame application if the write to the log was a 
frame.



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -765,6 +1323,7 @@ private MessageProvider(TxnId txnId)
         @Override
         public Set<MessageType> test(Set<MessageType> messages)
         {
+            // TODO (expected, correctness): fix other message provider 
methods to account for synonyms too

Review Comment:
   Which methods need to handle synonyms?



##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -711,43 +1255,57 @@ static abstract class AuxiliaryRecord
         abstract Type type();
     }
 
-    /*
-     * Placeholder for future record.
-     */
-    static final class ReplayRecord extends AuxiliaryRecord
+    static final class FrameRecord extends AuxiliaryRecord

Review Comment:
   Ok going to stop mentioning things that could be private :-)



##########
src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java:
##########
@@ -52,45 +48,34 @@ public void doVerb(Message<T> message) throws IOException
         // TODO (desired): need a non-blocking way to inform CMS of an unknown 
epoch and add callback to it's receipt
 //        ClusterMetadataService.instance().maybeCatchup(message.epoch());
         logger.debug("Receiving {} from {}", message.payload, message.from());
+        Node.Id fromNodeId = endpointMapper.mappedId(message.from());
         T request = message.payload;
+
         long knownEpoch = request.knownEpoch();
         if (!node.topology().hasEpoch(knownEpoch))
         {
             node.configService().fetchTopologyForEpoch(knownEpoch);
             long waitForEpoch = request.waitForEpoch();
             if (!node.topology().hasEpoch(waitForEpoch))

Review Comment:
   This seems like it's a property independent of `knownEpoch` and we could 
move it out and not have to copy the journal append part and have less 
indentation? The journal append is not dependent on having `waitForEpoch` it 
just depends on side effects. It really just changes whether we process 
directly or `withEpoch`.
   
   Maybe `request.process` should handle this check. instead of the caller?



##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -165,35 +166,35 @@ public static class State
             }
         }
         private static final ExecutorPlus executor = 
ExecutorFactory.Global.executorFactory().pooled("name", 10);
-        private static final AccordJournal journal = new AccordJournal();
+        private static final AccordJournal journal = new AccordJournal(null);
         private static final int events = 100;
         private static final CountDownLatch eventsWritten = 
CountDownLatch.newCountDownLatch(events);
         private static final CountDownLatch eventsDurable = 
CountDownLatch.newCountDownLatch(events);
         private static final List<Throwable> exceptions = new 
CopyOnWriteArrayList<>();
 
         static
         {
-            journal.start();
+            journal.start(null);
         }
 
         public static void append(int event)
         {
             TxnRequest<?> request = toRequest(event);
-            journal.appendMessage(request, executor, new AsyncWriteCallback()
-            {
-                @Override
-                public void run()
-                {
-                    durable(event);
-                }
-
-                @Override
-                public void onFailure(Throwable error)
-                {
-                    eventsDurable.decrement(); // to make sure we don't block 
forever
-                    exceptions.add(error);
-                }
-            });
+//            journal.appendMessageTest(request, executor, new 
AsyncWriteCallback()

Review Comment:
   Test not passing?



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -482,35 +479,14 @@ public TopologyManager topology()
         }
     }
 
-    private void handleLocalMessage(LocalMessage message, Node node)
+    // TODO (expected): confirm that we don't need to worry about 
waitForEpoch() here being ahead of known;
+    //                  was not being handled by Accord pre-journal, likely a 
non-issue.
+    private void handleLocalRequest(LocalRequest<?> request, Node node)

Review Comment:
   Which `waitForEpoch`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to