aweisberg commented on code in PR #2982:
URL: https://github.com/apache/cassandra/pull/2982#discussion_r1440775859
##########
src/java/org/apache/cassandra/journal/Flusher.java:
##########
@@ -155,21 +158,23 @@ public void doRun(Interruptible.State state) throws
InterruptedException
private void doFlush()
{
journal.selectSegmentToFlush(segmentsToFlush);
+ segmentsToFlush.sort(comparing(s -> s.descriptor));
+
// only schedule onSuccess callbacks for a segment if the
preceding segments
// have been fully flushed, to preserve 1:1 mapping between
record's position
// in the journal and onSuccess callback scheduling order
boolean scheduleOnSuccessCallbacks = true;
try
{
- for (ActiveSegment<K> segment : segmentsToFlush)
+ for (ActiveSegment<K, V> segment : segmentsToFlush)
{
try
{
scheduleOnSuccessCallbacks = doFlush(segment,
scheduleOnSuccessCallbacks) && scheduleOnSuccessCallbacks;
Review Comment:
If an earlier segment is not fully flushed (why?) when will we come back
later and run the necessary callbacks?
If we didn't fully flush does it imply it will get flushed again in the
future? Just checking this is handled.
##########
src/java/org/apache/cassandra/net/Message.java:
##########
@@ -194,6 +197,58 @@ public TraceType traceType()
return header.traceType();
}
+ /*
+ * minimal response context extraction
+ */
+
+ public ResponseContext extractResponseContext()
Review Comment:
Unused but I assume that is coming?
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -194,6 +195,34 @@ public boolean awaitTermination(long timeout, TimeUnit
units) throws Interrupted
return false;
}
+ /**
+ * Read an entry by its address (segment timestamp + offest)
+ *
+ * @return deserialized record if present, null otherwise
+ */
+ public V read(long segmentTimestamp, int offset)
Review Comment:
Am I crazy or is this unused?
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -423,14 +421,14 @@ private Runnable saveCommandsForKey(CommandsForKey
before, CommandsForKey after)
private Runnable saveTimestampsForKey(TimestampsForKey before,
TimestampsForKey after)
{
Mutation mutation = AccordKeyspace.getTimestampsForKeyMutation(id,
before, after, nextSystemTimestampMicros());
- return null != mutation ? mutation::apply : null;
+ return null != mutation ? mutation::applyUnsafe : null;
Review Comment:
We need to remember to eventually also have the mutations applied as part of
transactions be applied unsafe as well and then link segments being discarded
to flushing.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
+ haveWork.release(1);
+ }
+
+ void notifyOfEpoch()
+ {
+ haveWork.release(1);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE,
NON_DAEMON, SYNCHRONIZED);
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ /* internal reusable buffers used for frame generation */
+ private final ArrayList<PendingRequest> requestBuffer = new
ArrayList<>();
+ private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+ private final ArrayList<RequestContext> contextsBuffer = new
ArrayList<>();
+
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
+ if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+ return;
+
+ try
+ {
+ doRun();
+ }
+ finally
+ {
+ requestBuffer.clear();
+ pointersBuffer.clear();
+ contextsBuffer.clear();
+ }
+
+ haveWork.acquire(1);
+ }
+
+ private void doRun()
+ {
+ /*
+ * Deal with delayed requests
+ */
+
+ waitForEpochs.sort(null);
+
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+ }
+
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ /*
+ * Deal with regular pending requests
+ */
+
+ PendingRequest request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.context.waitForEpoch;
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore ->
new ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
+ {
+ waitForEpochs.addLong(waitForEpoch);
+ node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+ }
+ }
+ else
+ {
+ requestBuffer.add(request);
+ }
+ }
+
+ for (PendingRequest req : requestBuffer)
+ {
+ pointersBuffer.add(req.pointer);
+ contextsBuffer.add(req.context);
+ }
+
+ if (!requestBuffer.isEmpty())
+ {
+ FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointersBuffer.toArray(new Pointer[0]));
+ FrameContext context = new
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+ appendAuxiliaryRecord(frame, context);
+ }
+ }
+
+ private final class PendingRequest
+ {
+ final Pointer pointer;
+ final RequestContext context;
+
+ PendingRequest(Pointer pointer, RequestContext context)
+ {
+ this.pointer = pointer;
+ this.context = context;
+ }
+ }
+ }
+
+ private final class FrameApplicator implements Runnable
Review Comment:
Description that can reference `FrameAggregator` for most of the details.
This one is pretty straightforward. It gets the aggregated frames containing
previously written requests/messages and sorts and "applies" them once that
part of the journal is flushed.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
+ haveWork.release(1);
+ }
+
+ void notifyOfEpoch()
+ {
+ haveWork.release(1);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE,
NON_DAEMON, SYNCHRONIZED);
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ /* internal reusable buffers used for frame generation */
+ private final ArrayList<PendingRequest> requestBuffer = new
ArrayList<>();
+ private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+ private final ArrayList<RequestContext> contextsBuffer = new
ArrayList<>();
+
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
+ if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+ return;
+
+ try
+ {
+ doRun();
+ }
+ finally
+ {
+ requestBuffer.clear();
+ pointersBuffer.clear();
+ contextsBuffer.clear();
+ }
+
+ haveWork.acquire(1);
+ }
+
+ private void doRun()
+ {
+ /*
+ * Deal with delayed requests
+ */
+
+ waitForEpochs.sort(null);
+
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+ }
+
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ /*
+ * Deal with regular pending requests
+ */
+
+ PendingRequest request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.context.waitForEpoch;
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore ->
new ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
+ {
+ waitForEpochs.addLong(waitForEpoch);
+ node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+ }
+ }
+ else
+ {
+ requestBuffer.add(request);
+ }
+ }
+
+ for (PendingRequest req : requestBuffer)
+ {
+ pointersBuffer.add(req.pointer);
+ contextsBuffer.add(req.context);
+ }
+
+ if (!requestBuffer.isEmpty())
+ {
+ FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointersBuffer.toArray(new Pointer[0]));
+ FrameContext context = new
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+ appendAuxiliaryRecord(frame, context);
+ }
+ }
+
+ private final class PendingRequest
+ {
+ final Pointer pointer;
+ final RequestContext context;
+
+ PendingRequest(Pointer pointer, RequestContext context)
+ {
+ this.pointer = pointer;
+ this.context = context;
+ }
+ }
+ }
+
+ private final class FrameApplicator implements Runnable
+ {
+ /** external SPSC written frame queue */
+ private final SpscLinkedQueue<PendingFrame> newFrames = new
SpscLinkedQueue<>();
+
+ /* single-thread accessed internal frame buffer */
+ private final ArrayList<PendingFrame> pendingFrames = new
ArrayList<>();
+
+ /* furthest flushed journal segment + position */
+ private volatile Pointer flushedUntil = null;
+
+ private volatile SequentialExecutorPlus executor;
+
+ /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call
*/
+ void onWrite(Pointer start, int size, FrameContext context)
+ {
+ newFrames.add(new PendingFrame(start, new Pointer(start.segment,
start.position + size), context));
+ }
+
+ /* invoked only from Journal Flusher thread (single) */
+ void onFlush(long segment, int position)
+ {
+ flushedUntil = new Pointer(segment, position);
+ executor.submit(this);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().sequential("AccordJournal#FrameApplicator");
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ @Override
+ public void run()
+ {
+ if (newFrames.drain(pendingFrames::add) > 0)
+ {
+ /* order by position in the journal, DESC */
+ pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+ }
+
+ Pointer flushedUntil = this.flushedUntil;
+ for (int i = pendingFrames.size() - 1; i >= 0; i--)
+ {
+ PendingFrame frame = pendingFrames.get(i);
+ if (frame.end.compareTo(flushedUntil) > 0)
+ break;
+ applyFrame((FrameRecord) cachedRecords.remove(frame.start),
frame.context);
+ pendingFrames.remove(i);
+ }
+ }
+
+ private void applyFrame(FrameRecord frame, FrameContext context)
+ {
+ Invariants.checkState(frame.pointers.length ==
context.requestContexts.length);
+ for (int i = 0; i < frame.pointers.length; i++)
+ applyRequest(frame.pointers[i], context.requestContexts[i]);
+ }
+
+ private void applyRequest(Pointer pointer, RequestContext context)
+ {
+ Request request = (Request) cachedRecords.remove(pointer);
+ Type type = Type.fromMessageType(request.type());
+
+ if (type.isRemoteRequest())
+ {
+ RemoteRequestContext ctx = (RemoteRequestContext) context;
+ Id from = endpointMapper.mappedId(ctx.from());
+ request.process(node, from, ctx);
+ }
+ else
+ {
+ Invariants.checkState(type.isLocalRequest());
+ LocalRequestContext ctx = (LocalRequestContext) context;
+ //noinspection unchecked,rawtypes
+ ((LocalRequest) request).process(node, ctx.callback);
+ }
+ }
+
+ private final class PendingFrame
+ {
+ final Pointer start;
+ final Pointer end;
+ final FrameContext context;
+
+ PendingFrame(Pointer start, Pointer end, FrameContext context)
+ {
+ this.start = start;
+ this.end = end;
+ this.context = context;
+ }
+ }
+ }
+
+ static final class FrameContext
+ {
+ final RequestContext[] requestContexts;
+
+ FrameContext(RequestContext[] requestContexts)
+ {
+ this.requestContexts = requestContexts;
+ }
+ }
+
static abstract class AuxiliaryRecord
Review Comment:
Could be private
##########
src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java:
##########
@@ -52,45 +48,34 @@ public void doVerb(Message<T> message) throws IOException
// TODO (desired): need a non-blocking way to inform CMS of an unknown
epoch and add callback to it's receipt
// ClusterMetadataService.instance().maybeCatchup(message.epoch());
logger.debug("Receiving {} from {}", message.payload, message.from());
+ Node.Id fromNodeId = endpointMapper.mappedId(message.from());
T request = message.payload;
+
long knownEpoch = request.knownEpoch();
if (!node.topology().hasEpoch(knownEpoch))
{
node.configService().fetchTopologyForEpoch(knownEpoch);
long waitForEpoch = request.waitForEpoch();
Review Comment:
Request really needs to document the difference between `knownEpoch` and
`waitForEpoch`.
##########
src/java/org/apache/cassandra/net/Message.java:
##########
@@ -71,7 +70,7 @@
*
* @param <T> The type of the message payload.
*/
-public class Message<T> implements ReplyContext
+public class Message<T> implements ResponseContext
Review Comment:
It's getting a little weird that `ResponseContext` and `ReplyContext` are
basically the same thing, but one is accord and one is cassandra. Might be
better as `CassandraResponseContext` and `AccordResponseContext` or
`CassandraResponseContext` and `AccordResponseContext`
##########
src/java/org/apache/cassandra/journal/Flusher.java:
##########
@@ -181,11 +186,11 @@ private void doFlush()
}
// flush the segment, schedule write callbacks if requested, return
whether the segment has been flushed fully
- private boolean doFlush(ActiveSegment<K> segment, boolean
scheduleCallbacks)
+ private boolean doFlush(ActiveSegment<K, V> segment, boolean
scheduleCallbacks)
Review Comment:
Maybe be consistent with `scheduleCallbacks` and
`scheduleOnSuccessCallbacks`. If `callbacks` contains the `SuccessCallbacks`
maybe it should match.
I don't mind the wordiness since we have a lot of callbacks flying around
and knowing which one you are looking at makes it more readable.
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -665,10 +698,10 @@ void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K>
activeSegment)
*/
public void replayStaticSegments(RecordConsumer<K> consumer)
{
- List<StaticSegment<K>> staticSegments = new
ArrayList<>(segments().onlyStatic());
- staticSegments.sort(comparing(segment -> segment.descriptor));
-
- for (StaticSegment<K> segment : staticSegments)
+ ArrayList<StaticSegment<K, V>> staticSegments = new ArrayList<>();
Review Comment:
Why switch to `ArrayList`?
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
+ haveWork.release(1);
+ }
+
+ void notifyOfEpoch()
+ {
+ haveWork.release(1);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE,
NON_DAEMON, SYNCHRONIZED);
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ /* internal reusable buffers used for frame generation */
+ private final ArrayList<PendingRequest> requestBuffer = new
ArrayList<>();
+ private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+ private final ArrayList<RequestContext> contextsBuffer = new
ArrayList<>();
+
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
+ if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+ return;
+
+ try
+ {
+ doRun();
+ }
+ finally
+ {
+ requestBuffer.clear();
+ pointersBuffer.clear();
+ contextsBuffer.clear();
+ }
+
+ haveWork.acquire(1);
+ }
+
+ private void doRun()
+ {
+ /*
+ * Deal with delayed requests
+ */
+
+ waitForEpochs.sort(null);
+
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+ }
+
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ /*
+ * Deal with regular pending requests
+ */
+
+ PendingRequest request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.context.waitForEpoch;
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore ->
new ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
+ {
+ waitForEpochs.addLong(waitForEpoch);
+ node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+ }
+ }
+ else
+ {
+ requestBuffer.add(request);
+ }
+ }
+
+ for (PendingRequest req : requestBuffer)
+ {
+ pointersBuffer.add(req.pointer);
+ contextsBuffer.add(req.context);
+ }
+
+ if (!requestBuffer.isEmpty())
+ {
+ FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointersBuffer.toArray(new Pointer[0]));
+ FrameContext context = new
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+ appendAuxiliaryRecord(frame, context);
+ }
+ }
+
+ private final class PendingRequest
+ {
+ final Pointer pointer;
+ final RequestContext context;
+
+ PendingRequest(Pointer pointer, RequestContext context)
+ {
+ this.pointer = pointer;
+ this.context = context;
+ }
+ }
+ }
+
+ private final class FrameApplicator implements Runnable
+ {
+ /** external SPSC written frame queue */
+ private final SpscLinkedQueue<PendingFrame> newFrames = new
SpscLinkedQueue<>();
+
+ /* single-thread accessed internal frame buffer */
+ private final ArrayList<PendingFrame> pendingFrames = new
ArrayList<>();
+
+ /* furthest flushed journal segment + position */
+ private volatile Pointer flushedUntil = null;
+
+ private volatile SequentialExecutorPlus executor;
+
+ /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call
*/
+ void onWrite(Pointer start, int size, FrameContext context)
+ {
+ newFrames.add(new PendingFrame(start, new Pointer(start.segment,
start.position + size), context));
+ }
+
+ /* invoked only from Journal Flusher thread (single) */
+ void onFlush(long segment, int position)
+ {
+ flushedUntil = new Pointer(segment, position);
+ executor.submit(this);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().sequential("AccordJournal#FrameApplicator");
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ @Override
+ public void run()
+ {
+ if (newFrames.drain(pendingFrames::add) > 0)
+ {
+ /* order by position in the journal, DESC */
+ pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+ }
+
+ Pointer flushedUntil = this.flushedUntil;
+ for (int i = pendingFrames.size() - 1; i >= 0; i--)
+ {
+ PendingFrame frame = pendingFrames.get(i);
+ if (frame.end.compareTo(flushedUntil) > 0)
+ break;
+ applyFrame((FrameRecord) cachedRecords.remove(frame.start),
frame.context);
+ pendingFrames.remove(i);
+ }
+ }
+
+ private void applyFrame(FrameRecord frame, FrameContext context)
+ {
+ Invariants.checkState(frame.pointers.length ==
context.requestContexts.length);
+ for (int i = 0; i < frame.pointers.length; i++)
+ applyRequest(frame.pointers[i], context.requestContexts[i]);
+ }
+
+ private void applyRequest(Pointer pointer, RequestContext context)
+ {
+ Request request = (Request) cachedRecords.remove(pointer);
+ Type type = Type.fromMessageType(request.type());
+
+ if (type.isRemoteRequest())
+ {
+ RemoteRequestContext ctx = (RemoteRequestContext) context;
+ Id from = endpointMapper.mappedId(ctx.from());
+ request.process(node, from, ctx);
+ }
+ else
+ {
+ Invariants.checkState(type.isLocalRequest());
+ LocalRequestContext ctx = (LocalRequestContext) context;
+ //noinspection unchecked,rawtypes
+ ((LocalRequest) request).process(node, ctx.callback);
+ }
+ }
+
+ private final class PendingFrame
Review Comment:
Comment, frame containing pointers to all the requests in the journal
contained by the frame that have been written to the journal, but have not been
process by the frame applicaticator yet. Will be processed by the frame
applicator once the journal has flushed the frame.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -211,21 +254,28 @@ public Object shutdownNow()
@Override
public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
{
- // TODO (expected)
+ // TODO (expected, other)
return true;
}
- void appendAuxiliaryRecord(AuxiliaryRecord record)
+ void appendAuxiliaryRecord(AuxiliaryRecord record, Object context)
Review Comment:
One sentence description of what an auxiliary record is that is being
appended?
##########
src/java/org/apache/cassandra/journal/Segments.java:
##########
@@ -17,123 +17,92 @@
*/
package org.apache.cassandra.journal;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
import accord.utils.Invariants;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.emptyMap;
-
/**
* Consistent, immutable view of active + static segments
* <p/>
- * TODO: an interval/range structure for StaticSegment lookup based on min/max
key bounds
+ * TODO (performance, expected): an interval/range structure for StaticSegment
lookup based on min/max key bounds
*/
-class Segments<K>
+class Segments<K, V>
{
- // active segments, containing unflushed data; the tail of this queue is
the one we allocate writes from
- private final List<ActiveSegment<K>> activeSegments;
-
- // finalised segments, no longer written to
- private final Map<Descriptor, StaticSegment<K>> staticSegments;
+ private final Long2ObjectHashMap<Segment<K, V>> segments;
- // cached Iterable of concatenated active and static segments
- private final Iterable<Segment<K>> allSegments;
-
- Segments(List<ActiveSegment<K>> activeSegments, Map<Descriptor,
StaticSegment<K>> staticSegments)
+ Segments(Long2ObjectHashMap<Segment<K, V>> segments)
{
- this.activeSegments = activeSegments;
- this.staticSegments = staticSegments;
- this.allSegments = Iterables.concat(onlyActive(), onlyStatic());
+ this.segments = segments;
}
- static <K> Segments<K> ofStatic(Collection<StaticSegment<K>> segments)
+ static <K, V> Segments<K, V> of(Collection<Segment<K, V>> segments)
{
- HashMap<Descriptor, StaticSegment<K>> staticSegments =
- Maps.newHashMapWithExpectedSize(segments.size());
- for (StaticSegment<K> segment : segments)
- staticSegments.put(segment.descriptor, segment);
- return new Segments<>(new ArrayList<>(), staticSegments);
+ Long2ObjectHashMap<Segment<K, V>> newSegments =
newMap(segments.size());
+ for (Segment<K, V> segment : segments)
+ newSegments.put(segment.descriptor.timestamp, segment);
+ return new Segments<>(newSegments);
}
- static <K> Segments<K> none()
+ static <K, V> Segments<K, V> none()
{
- return new Segments<>(Collections.emptyList(), Collections.emptyMap());
+ return new Segments<>(emptyMap());
}
- Segments<K> withNewActiveSegment(ActiveSegment<K> activeSegment)
+ Segments<K, V> withNewActiveSegment(ActiveSegment<K, V> activeSegment)
{
- ArrayList<ActiveSegment<K>> newActiveSegments =
- new ArrayList<>(activeSegments.size() + 1);
- newActiveSegments.addAll(activeSegments);
- newActiveSegments.add(activeSegment);
- return new Segments<>(newActiveSegments, staticSegments);
+ Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
+ Segment<K, V> oldValue =
newSegments.put(activeSegment.descriptor.timestamp, activeSegment);
+ Invariants.checkState(oldValue == null);
+ return new Segments<>(newSegments);
}
- Segments<K> withCompletedSegment(ActiveSegment<K> activeSegment,
StaticSegment<K> staticSegment)
+ Segments<K, V> withCompletedSegment(ActiveSegment<K, V> activeSegment,
StaticSegment<K, V> staticSegment)
{
Invariants.checkArgument(activeSegment.descriptor.equals(staticSegment.descriptor));
-
- ArrayList<ActiveSegment<K>> newActiveSegments =
- new ArrayList<>(activeSegments.size() - 1);
- for (ActiveSegment<K> segment : activeSegments)
- if (segment != activeSegment)
- newActiveSegments.add(segment);
- Invariants.checkState(newActiveSegments.size() ==
activeSegments.size() - 1);
-
- HashMap<Descriptor, StaticSegment<K>> newStaticSegments =
- Maps.newHashMapWithExpectedSize(staticSegments.size() + 1);
- newStaticSegments.putAll(staticSegments);
- if (newStaticSegments.put(staticSegment.descriptor, staticSegment) !=
null)
- throw new IllegalStateException();
-
- return new Segments<>(newActiveSegments, newStaticSegments);
+ Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
+ Segment<K, V> oldValue =
newSegments.put(staticSegment.descriptor.timestamp, staticSegment);
+ Invariants.checkState(oldValue == activeSegment);
+ return new Segments<>(newSegments);
}
- Segments<K> withCompactedSegment(StaticSegment<K> oldSegment,
StaticSegment<K> newSegment)
+ Segments<K, V> withCompactedSegment(StaticSegment<K, V> oldSegment,
StaticSegment<K, V> newSegment)
{
Invariants.checkArgument(oldSegment.descriptor.timestamp ==
newSegment.descriptor.timestamp);
Invariants.checkArgument(oldSegment.descriptor.generation <
newSegment.descriptor.generation);
-
- HashMap<Descriptor, StaticSegment<K>> newStaticSegments = new
HashMap<>(staticSegments);
- if (!newStaticSegments.remove(oldSegment.descriptor, oldSegment))
- throw new IllegalStateException();
- if (null != newStaticSegments.put(newSegment.descriptor, newSegment))
- throw new IllegalStateException();
-
- return new Segments<>(activeSegments, newStaticSegments);
+ Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
+ Segment<K, V> oldValue =
newSegments.put(newSegment.descriptor.timestamp, newSegment);
+ Invariants.checkState(oldValue == oldSegment);
+ return new Segments<>(newSegments);
}
- Segments<K> withoutInvalidatedSegment(StaticSegment<K> staticSegment)
+ Segments<K, V> withoutInvalidatedSegment(StaticSegment<K, V> staticSegment)
{
- HashMap<Descriptor, StaticSegment<K>> newStaticSegments = new
HashMap<>(staticSegments);
- if (!newStaticSegments.remove(staticSegment.descriptor, staticSegment))
+ Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
+ if (!newSegments.remove(staticSegment.descriptor.timestamp,
staticSegment))
throw new IllegalStateException();
- return new Segments<>(activeSegments, newStaticSegments);
+ return new Segments<>(newSegments);
}
- Iterable<Segment<K>> all()
+ Iterable<Segment<K, V>> all()
{
- return allSegments;
+ return segments.values();
}
- Collection<ActiveSegment<K>> onlyActive()
+ void selectActive(long maxTimestamp, Collection<ActiveSegment<K, V>> into)
Review Comment:
Could be an iterables transform and filter, but sure this is probably
slightly faster.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -211,21 +254,28 @@ public Object shutdownNow()
@Override
public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
{
- // TODO (expected)
+ // TODO (expected, other)
return true;
}
- void appendAuxiliaryRecord(AuxiliaryRecord record)
+ void appendAuxiliaryRecord(AuxiliaryRecord record, Object context)
{
Key key = new Key(record.timestamp, record.type());
- journal.write(key, record, SENTINEL_HOSTS);
+ journal.asyncWrite(key, record, SENTINEL_HOSTS, context);
}
- public void appendMessage(Message message, Executor executor,
AsyncWriteCallback callback)
+ public void appendRemoteRequest(Request request, ResponseContext context)
Review Comment:
Same as above
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
+ haveWork.release(1);
+ }
+
+ void notifyOfEpoch()
+ {
+ haveWork.release(1);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE,
NON_DAEMON, SYNCHRONIZED);
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ /* internal reusable buffers used for frame generation */
+ private final ArrayList<PendingRequest> requestBuffer = new
ArrayList<>();
+ private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+ private final ArrayList<RequestContext> contextsBuffer = new
ArrayList<>();
+
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
+ if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+ return;
+
+ try
+ {
+ doRun();
+ }
+ finally
+ {
+ requestBuffer.clear();
+ pointersBuffer.clear();
+ contextsBuffer.clear();
+ }
+
+ haveWork.acquire(1);
+ }
+
+ private void doRun()
+ {
+ /*
+ * Deal with delayed requests
+ */
+
+ waitForEpochs.sort(null);
+
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+ }
+
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ /*
+ * Deal with regular pending requests
+ */
+
+ PendingRequest request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.context.waitForEpoch;
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore ->
new ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
+ {
+ waitForEpochs.addLong(waitForEpoch);
+ node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+ }
+ }
+ else
+ {
+ requestBuffer.add(request);
+ }
+ }
+
+ for (PendingRequest req : requestBuffer)
+ {
+ pointersBuffer.add(req.pointer);
+ contextsBuffer.add(req.context);
+ }
+
+ if (!requestBuffer.isEmpty())
+ {
+ FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointersBuffer.toArray(new Pointer[0]));
Review Comment:
You could avoid converting to an array which is a copy, and not sure if Java
escapes out the new empty array.
When deserializing you can do `Arrays.asList` which is strictly cheaper than
always having to do the copy.
##########
src/java/org/apache/cassandra/journal/Flusher.java:
##########
@@ -181,11 +186,11 @@ private void doFlush()
}
// flush the segment, schedule write callbacks if requested, return
whether the segment has been flushed fully
- private boolean doFlush(ActiveSegment<K> segment, boolean
scheduleCallbacks)
+ private boolean doFlush(ActiveSegment<K, V> segment, boolean
scheduleCallbacks)
{
int syncedOffset = segment.flush();
if (scheduleCallbacks)
- segment.scheduleOnSuccessCallbacks(syncedOffset);
+ callbacks.onFlush(segment.descriptor.timestamp, syncedOffset);
return segment.isFullyFlushed(syncedOffset);
Review Comment:
Add a quick comment with the definition of `FullyFlushed` to the
`isFullyFlushed` method since how `endOfBuffer` works.
It seems like `endOfBuffer` is detecting the case where you see the next
segment, but don't actually flush all of the entries appended to the previous
segment so it's not safe to invoke callbacks for the next segment since you
skipped some from the previous segment?
If we don't fully flush, should we flush the next segment if we can't invoke
the callbacks for it anyways? Maybe break out of the loop and we can flush
everything next time around?
##########
src/java/org/apache/cassandra/journal/AsyncCallbacks.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+/**
+ * TODO (expected): document, implement
Review Comment:
These methods are 90% obvious. Maybe define what flush is, and then
contextually at what point each is called.
Could probably be summarized as `onWrite` is called after all in memory and
file contents are updated, but before the write is flushed.
`onWriteFailed` could be anywhere from serialization to writing to the file,
to requesting the flush.
`onFlush` seems to imply the segment is flushed, but doesn't imply that the
success callbacks will or have been run. Not sure it's relevant to mention in
this interface.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -211,21 +254,28 @@ public Object shutdownNow()
@Override
public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
{
- // TODO (expected)
+ // TODO (expected, other)
return true;
}
- void appendAuxiliaryRecord(AuxiliaryRecord record)
+ void appendAuxiliaryRecord(AuxiliaryRecord record, Object context)
{
Key key = new Key(record.timestamp, record.type());
- journal.write(key, record, SENTINEL_HOSTS);
+ journal.asyncWrite(key, record, SENTINEL_HOSTS, context);
}
- public void appendMessage(Message message, Executor executor,
AsyncWriteCallback callback)
+ public void appendRemoteRequest(Request request, ResponseContext context)
{
- Type type = Type.fromMessageType(message.type());
- Key key = new Key(type.txnId(message), type);
- journal.asyncWrite(key, message, SENTINEL_HOSTS, executor, callback);
+ Type type = Type.fromMessageType(request.type());
+ Key key = new Key(type.txnId(request), type);
+ journal.asyncWrite(key, request, SENTINEL_HOSTS, context);
+ }
+
+ public void appendLocalRequest(LocalRequest<?> request)
Review Comment:
Same as above
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
+ haveWork.release(1);
+ }
+
+ void notifyOfEpoch()
+ {
+ haveWork.release(1);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE,
NON_DAEMON, SYNCHRONIZED);
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ /* internal reusable buffers used for frame generation */
+ private final ArrayList<PendingRequest> requestBuffer = new
ArrayList<>();
+ private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+ private final ArrayList<RequestContext> contextsBuffer = new
ArrayList<>();
+
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
+ if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+ return;
+
+ try
+ {
+ doRun();
+ }
+ finally
+ {
+ requestBuffer.clear();
+ pointersBuffer.clear();
+ contextsBuffer.clear();
+ }
+
+ haveWork.acquire(1);
+ }
+
+ private void doRun()
+ {
+ /*
+ * Deal with delayed requests
+ */
+
+ waitForEpochs.sort(null);
+
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+ }
+
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ /*
+ * Deal with regular pending requests
+ */
+
+ PendingRequest request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.context.waitForEpoch;
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore ->
new ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
+ {
+ waitForEpochs.addLong(waitForEpoch);
+ node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+ }
+ }
+ else
+ {
+ requestBuffer.add(request);
+ }
+ }
+
+ for (PendingRequest req : requestBuffer)
+ {
+ pointersBuffer.add(req.pointer);
+ contextsBuffer.add(req.context);
+ }
+
+ if (!requestBuffer.isEmpty())
+ {
+ FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointersBuffer.toArray(new Pointer[0]));
+ FrameContext context = new
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+ appendAuxiliaryRecord(frame, context);
+ }
+ }
+
+ private final class PendingRequest
+ {
+ final Pointer pointer;
+ final RequestContext context;
+
+ PendingRequest(Pointer pointer, RequestContext context)
+ {
+ this.pointer = pointer;
+ this.context = context;
+ }
+ }
+ }
+
+ private final class FrameApplicator implements Runnable
+ {
+ /** external SPSC written frame queue */
+ private final SpscLinkedQueue<PendingFrame> newFrames = new
SpscLinkedQueue<>();
+
+ /* single-thread accessed internal frame buffer */
+ private final ArrayList<PendingFrame> pendingFrames = new
ArrayList<>();
+
+ /* furthest flushed journal segment + position */
+ private volatile Pointer flushedUntil = null;
+
+ private volatile SequentialExecutorPlus executor;
+
+ /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call
*/
+ void onWrite(Pointer start, int size, FrameContext context)
+ {
+ newFrames.add(new PendingFrame(start, new Pointer(start.segment,
start.position + size), context));
+ }
+
+ /* invoked only from Journal Flusher thread (single) */
+ void onFlush(long segment, int position)
+ {
+ flushedUntil = new Pointer(segment, position);
+ executor.submit(this);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().sequential("AccordJournal#FrameApplicator");
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ @Override
+ public void run()
+ {
+ if (newFrames.drain(pendingFrames::add) > 0)
+ {
+ /* order by position in the journal, DESC */
+ pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+ }
+
+ Pointer flushedUntil = this.flushedUntil;
+ for (int i = pendingFrames.size() - 1; i >= 0; i--)
+ {
+ PendingFrame frame = pendingFrames.get(i);
+ if (frame.end.compareTo(flushedUntil) > 0)
+ break;
+ applyFrame((FrameRecord) cachedRecords.remove(frame.start),
frame.context);
+ pendingFrames.remove(i);
+ }
+ }
+
+ private void applyFrame(FrameRecord frame, FrameContext context)
+ {
+ Invariants.checkState(frame.pointers.length ==
context.requestContexts.length);
+ for (int i = 0; i < frame.pointers.length; i++)
+ applyRequest(frame.pointers[i], context.requestContexts[i]);
+ }
+
+ private void applyRequest(Pointer pointer, RequestContext context)
+ {
+ Request request = (Request) cachedRecords.remove(pointer);
Review Comment:
So right now this cache isn't really a cache, but in the future it will read
the message back off of disk if necessary?
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
Review Comment:
Describe what a frame is, how/why we create them, and what we do with them
once created and after they are written, and after they are flushed
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
+ haveWork.release(1);
+ }
+
+ void notifyOfEpoch()
+ {
+ haveWork.release(1);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE,
NON_DAEMON, SYNCHRONIZED);
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ /* internal reusable buffers used for frame generation */
+ private final ArrayList<PendingRequest> requestBuffer = new
ArrayList<>();
+ private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+ private final ArrayList<RequestContext> contextsBuffer = new
ArrayList<>();
+
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
+ if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+ return;
+
+ try
+ {
+ doRun();
+ }
+ finally
+ {
+ requestBuffer.clear();
+ pointersBuffer.clear();
+ contextsBuffer.clear();
+ }
+
+ haveWork.acquire(1);
+ }
+
+ private void doRun()
+ {
+ /*
+ * Deal with delayed requests
+ */
+
+ waitForEpochs.sort(null);
+
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+ }
+
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ /*
+ * Deal with regular pending requests
+ */
+
+ PendingRequest request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.context.waitForEpoch;
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore ->
new ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
Review Comment:
A set would resist long searches through large numbers of epochs. If that
happens maybe we have bigger problems.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
+ haveWork.release(1);
+ }
+
+ void notifyOfEpoch()
+ {
+ haveWork.release(1);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().infiniteLoop("AccordJournal#FrameAggregator", this, SAFE,
NON_DAEMON, SYNCHRONIZED);
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ /* internal reusable buffers used for frame generation */
+ private final ArrayList<PendingRequest> requestBuffer = new
ArrayList<>();
+ private final ArrayList<Pointer> pointersBuffer = new ArrayList<>();
+ private final ArrayList<RequestContext> contextsBuffer = new
ArrayList<>();
+
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
+ if (unframedRequests.isEmpty() && delayedRequests.isEmpty())
+ return;
+
+ try
+ {
+ doRun();
+ }
+ finally
+ {
+ requestBuffer.clear();
+ pointersBuffer.clear();
+ contextsBuffer.clear();
+ }
+
+ haveWork.acquire(1);
+ }
+
+ private void doRun()
+ {
+ /*
+ * Deal with delayed requests
+ */
+
+ waitForEpochs.sort(null);
+
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ requestBuffer.addAll(delayedRequests.remove(waitForEpoch));
+ }
+
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ /*
+ * Deal with regular pending requests
+ */
+
+ PendingRequest request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.context.waitForEpoch;
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore ->
new ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
+ {
+ waitForEpochs.addLong(waitForEpoch);
+ node.withEpoch(waitForEpoch, this::notifyOfEpoch);
+ }
+ }
+ else
+ {
+ requestBuffer.add(request);
+ }
+ }
+
+ for (PendingRequest req : requestBuffer)
+ {
+ pointersBuffer.add(req.pointer);
+ contextsBuffer.add(req.context);
+ }
+
+ if (!requestBuffer.isEmpty())
+ {
+ FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointersBuffer.toArray(new Pointer[0]));
+ FrameContext context = new
FrameContext(contextsBuffer.toArray(new RequestContext[0]));
+ appendAuxiliaryRecord(frame, context);
+ }
+ }
+
+ private final class PendingRequest
+ {
+ final Pointer pointer;
+ final RequestContext context;
+
+ PendingRequest(Pointer pointer, RequestContext context)
+ {
+ this.pointer = pointer;
+ this.context = context;
+ }
+ }
+ }
+
+ private final class FrameApplicator implements Runnable
+ {
+ /** external SPSC written frame queue */
+ private final SpscLinkedQueue<PendingFrame> newFrames = new
SpscLinkedQueue<>();
+
+ /* single-thread accessed internal frame buffer */
+ private final ArrayList<PendingFrame> pendingFrames = new
ArrayList<>();
+
+ /* furthest flushed journal segment + position */
+ private volatile Pointer flushedUntil = null;
+
+ private volatile SequentialExecutorPlus executor;
+
+ /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call
*/
+ void onWrite(Pointer start, int size, FrameContext context)
+ {
+ newFrames.add(new PendingFrame(start, new Pointer(start.segment,
start.position + size), context));
+ }
+
+ /* invoked only from Journal Flusher thread (single) */
+ void onFlush(long segment, int position)
+ {
+ flushedUntil = new Pointer(segment, position);
+ executor.submit(this);
+ }
+
+ void start()
+ {
+ executor =
executorFactory().sequential("AccordJournal#FrameApplicator");
+ }
+
+ void shutdown()
+ {
+ executor.shutdown();
+ }
+
+ @Override
+ public void run()
+ {
+ if (newFrames.drain(pendingFrames::add) > 0)
+ {
+ /* order by position in the journal, DESC */
+ pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
+ }
+
+ Pointer flushedUntil = this.flushedUntil;
+ for (int i = pendingFrames.size() - 1; i >= 0; i--)
+ {
+ PendingFrame frame = pendingFrames.get(i);
+ if (frame.end.compareTo(flushedUntil) > 0)
+ break;
+ applyFrame((FrameRecord) cachedRecords.remove(frame.start),
frame.context);
+ pendingFrames.remove(i);
+ }
+ }
+
+ private void applyFrame(FrameRecord frame, FrameContext context)
+ {
+ Invariants.checkState(frame.pointers.length ==
context.requestContexts.length);
+ for (int i = 0; i < frame.pointers.length; i++)
+ applyRequest(frame.pointers[i], context.requestContexts[i]);
+ }
+
+ private void applyRequest(Pointer pointer, RequestContext context)
+ {
+ Request request = (Request) cachedRecords.remove(pointer);
+ Type type = Type.fromMessageType(request.type());
+
+ if (type.isRemoteRequest())
+ {
+ RemoteRequestContext ctx = (RemoteRequestContext) context;
+ Id from = endpointMapper.mappedId(ctx.from());
+ request.process(node, from, ctx);
+ }
+ else
+ {
+ Invariants.checkState(type.isLocalRequest());
+ LocalRequestContext ctx = (LocalRequestContext) context;
+ //noinspection unchecked,rawtypes
+ ((LocalRequest) request).process(node, ctx.callback);
+ }
+ }
+
+ private final class PendingFrame
+ {
+ final Pointer start;
+ final Pointer end;
+ final FrameContext context;
+
+ PendingFrame(Pointer start, Pointer end, FrameContext context)
+ {
+ this.start = start;
+ this.end = end;
+ this.context = context;
+ }
+ }
+ }
+
+ static final class FrameContext
Review Comment:
Could be private
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -699,6 +1003,246 @@ private static int msVersion(int version)
}
}
+ /*
+ * Record framing logic
+ */
+
+ private final class FrameAggregator implements Interruptible.Task
+ {
+ /* external MPSC pending request queue */
+ private final ManyToOneConcurrentLinkedQueue<PendingRequest>
unframedRequests = new ManyToOneConcurrentLinkedQueue<>();
+
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<PendingRequest>>
delayedRequests = new Long2ObjectHashMap<>();
+
+ private volatile Interruptible executor;
+
+ // a signal and flag that callers outside the aggregator thread can use
+ // to signal they want the aggregator to run again
+ private final Semaphore haveWork = newSemaphore(1);
+
+ void onWrite(Pointer pointer, RequestContext context)
+ {
+ unframedRequests.add(new PendingRequest(pointer, context));
Review Comment:
`RequestContext` could provide storage for `Pointer` instead of allocating a
new object. A step further would be to store pointers in a columnar fashion so
you don't have pointer objects, but that really depends on how hard you want to
go at this.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -711,43 +1255,57 @@ static abstract class AuxiliaryRecord
abstract Type type();
}
- /*
- * Placeholder for future record.
- */
- static final class ReplayRecord extends AuxiliaryRecord
+ static final class FrameRecord extends AuxiliaryRecord
{
- ReplayRecord(Timestamp timestamp)
+ final Pointer[] pointers;
+
+ FrameRecord(Timestamp timestamp, Pointer[] pointers)
{
super(timestamp);
+ this.pointers = pointers;
}
@Override
Type type()
{
- return Type.REPLAY;
+ return Type.FRAME;
}
- static final ValueSerializer<Key, ReplayRecord> SERIALIZER = new
ValueSerializer<Key, ReplayRecord>()
+ static final ValueSerializer<Key, FrameRecord> SERIALIZER = new
ValueSerializer<>()
{
@Override
- public int serializedSize(Key key, ReplayRecord record, int
userVersion)
+ public int serializedSize(Key key, FrameRecord frame, int
userVersion)
{
- return 0;
+ int size = computeUnsignedVIntSize(frame.pointers.length);
+ for (Pointer pointer : frame.pointers)
+ size += pointer.serializedSize();
+ return size;
}
@Override
- public void serialize(Key key, ReplayRecord record, DataOutputPlus
out, int userVersion)
+ public void serialize(Key key, FrameRecord frame, DataOutputPlus
out, int userVersion) throws IOException
{
+ out.writeUnsignedVInt32(frame.pointers.length);
+ for (Pointer pointer : frame.pointers)
+ pointer.serialize(out);
}
@Override
- public ReplayRecord deserialize(Key key, DataInputPlus in, int
userVersion)
+ public FrameRecord deserialize(Key key, DataInputPlus in, int
userVersion) throws IOException
{
- return new ReplayRecord(key.timestamp);
+ int count = in.readUnsignedVInt32();
+ Pointer[] pointers = new Pointer[count];
+ for (int i = 0; i < count; i++)
+ pointers[i] = Pointer.deserialize(in);
+ return new FrameRecord(key.timestamp, pointers);
}
};
}
+ /*
+ * Message provider implementation
+ */
+
Review Comment:
Extra line break?
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -237,17 +287,241 @@ public void appendMessageBlocking(Message message)
}
@VisibleForTesting
- public <M extends Message> M readMessage(TxnId txnId, Type type, Class<M>
clazz)
+ public <M extends Message> M readMessage(TxnId txnId, MessageType
messageType, Class<M> clazz)
+ {
+ for (Type type : Type.synonymousTypesFromMessageType(messageType))
+ {
+ M message = clazz.cast(journal.readFirst(new Key(txnId, type)));
+ if (null != message) return message;
+ }
+ return null;
+ }
+
+ private <M extends Message> M readMessage(TxnId txnId, MessageType
messageType, Class<M> clazz, Predicate<Object> condition)
+ {
+ for (Type type : Type.synonymousTypesFromMessageType(messageType))
+ {
+ M message = clazz.cast(journal.readFirstMatching(new Key(txnId,
type), condition));
+ if (null != message) return message;
+ }
+ return null;
+ }
+
+ private static class Pointer implements Comparable<Pointer>
+ {
+ final long segment; // unique segment id
+ final int position; // record start position within the segment
+
+ Pointer(long segment, int position)
+ {
+ this.segment = segment;
+ this.position = position;
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (this == other)
+ return true;
+ if (!(other instanceof Pointer))
+ return false;
+ Pointer that = (Pointer) other;
+ return this.segment == that.segment
+ && this.position == that.position;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.hashCode(segment) + position * 31;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(" + segment + ", " + position + ')';
+ }
+
+ @Override
+ public int compareTo(Pointer that)
+ {
+ int cmp = Longs.compare(this.segment, that.segment);
+ return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
+ }
+
+ int serializedSize()
+ {
+ return computeUnsignedVIntSize(segment) +
computeUnsignedVIntSize(position);
+ }
+
+ void serialize(DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt(segment);
+ out.writeUnsignedVInt32(position);
+ }
+
+ static Pointer deserialize(DataInputPlus in) throws IOException
+ {
+ long segment = in.readUnsignedVInt();
+ int position = in.readUnsignedVInt32();
+ return new Pointer(segment, position);
+ }
+ }
+
+ private class JournalCallbacks implements AsyncCallbacks<Key, Object>
+ {
+ @Override
+ public void onWrite(long segment, int position, int size, Key key,
Object value, Object writeContext)
Review Comment:
One sentence comment, route the completion of writes to the next step which
is either frame aggregation or frame application if the write to the log was a
frame.
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -765,6 +1323,7 @@ private MessageProvider(TxnId txnId)
@Override
public Set<MessageType> test(Set<MessageType> messages)
{
+ // TODO (expected, correctness): fix other message provider
methods to account for synonyms too
Review Comment:
Which methods need to handle synonyms?
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -711,43 +1255,57 @@ static abstract class AuxiliaryRecord
abstract Type type();
}
- /*
- * Placeholder for future record.
- */
- static final class ReplayRecord extends AuxiliaryRecord
+ static final class FrameRecord extends AuxiliaryRecord
Review Comment:
Ok going to stop mentioning things that could be private :-)
##########
src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java:
##########
@@ -52,45 +48,34 @@ public void doVerb(Message<T> message) throws IOException
// TODO (desired): need a non-blocking way to inform CMS of an unknown
epoch and add callback to it's receipt
// ClusterMetadataService.instance().maybeCatchup(message.epoch());
logger.debug("Receiving {} from {}", message.payload, message.from());
+ Node.Id fromNodeId = endpointMapper.mappedId(message.from());
T request = message.payload;
+
long knownEpoch = request.knownEpoch();
if (!node.topology().hasEpoch(knownEpoch))
{
node.configService().fetchTopologyForEpoch(knownEpoch);
long waitForEpoch = request.waitForEpoch();
if (!node.topology().hasEpoch(waitForEpoch))
Review Comment:
This seems like it's a property independent of `knownEpoch` and we could
move it out and not have to copy the journal append part and have less
indentation? The journal append is not dependent on having `waitForEpoch` it
just depends on side effects. It really just changes whether we process
directly or `withEpoch`.
Maybe `request.process` should handle this check. instead of the caller?
##########
test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java:
##########
@@ -165,35 +166,35 @@ public static class State
}
}
private static final ExecutorPlus executor =
ExecutorFactory.Global.executorFactory().pooled("name", 10);
- private static final AccordJournal journal = new AccordJournal();
+ private static final AccordJournal journal = new AccordJournal(null);
private static final int events = 100;
private static final CountDownLatch eventsWritten =
CountDownLatch.newCountDownLatch(events);
private static final CountDownLatch eventsDurable =
CountDownLatch.newCountDownLatch(events);
private static final List<Throwable> exceptions = new
CopyOnWriteArrayList<>();
static
{
- journal.start();
+ journal.start(null);
}
public static void append(int event)
{
TxnRequest<?> request = toRequest(event);
- journal.appendMessage(request, executor, new AsyncWriteCallback()
- {
- @Override
- public void run()
- {
- durable(event);
- }
-
- @Override
- public void onFailure(Throwable error)
- {
- eventsDurable.decrement(); // to make sure we don't block
forever
- exceptions.add(error);
- }
- });
+// journal.appendMessageTest(request, executor, new
AsyncWriteCallback()
Review Comment:
Test not passing?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -482,35 +479,14 @@ public TopologyManager topology()
}
}
- private void handleLocalMessage(LocalMessage message, Node node)
+ // TODO (expected): confirm that we don't need to worry about
waitForEpoch() here being ahead of known;
+ // was not being handled by Accord pre-journal, likely a
non-issue.
+ private void handleLocalRequest(LocalRequest<?> request, Node node)
Review Comment:
Which `waitForEpoch`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]