iamaleksey commented on code in PR #2982:
URL: https://github.com/apache/cassandra/pull/2982#discussion_r1450556889


##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -237,17 +287,241 @@ public void appendMessageBlocking(Message message)
     }
 
     @VisibleForTesting
-    public <M extends Message> M readMessage(TxnId txnId, Type type, Class<M> 
clazz)
+    public <M extends Message> M readMessage(TxnId txnId, MessageType 
messageType, Class<M> clazz)
+    {
+        for (Type type : Type.synonymousTypesFromMessageType(messageType))
+        {
+            M message = clazz.cast(journal.readFirst(new Key(txnId, type)));
+            if (null != message) return message;
+        }
+        return null;
+    }
+
+    private <M extends Message> M readMessage(TxnId txnId, MessageType 
messageType, Class<M> clazz, Predicate<Object> condition)
+    {
+        for (Type type : Type.synonymousTypesFromMessageType(messageType))
+        {
+            M message = clazz.cast(journal.readFirstMatching(new Key(txnId, 
type), condition));
+            if (null != message) return message;
+        }
+        return null;
+    }
+
+    private static class Pointer implements Comparable<Pointer>
+    {
+        final long segment; // unique segment id
+        final int position; // record start position within the segment
+
+        Pointer(long segment, int position)
+        {
+            this.segment = segment;
+            this.position = position;
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (this == other)
+                return true;
+            if (!(other instanceof Pointer))
+                return false;
+            Pointer that = (Pointer) other;
+            return this.segment == that.segment
+                && this.position == that.position;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Long.hashCode(segment) + position * 31;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "(" + segment + ", " + position + ')';
+        }
+
+        @Override
+        public int compareTo(Pointer that)
+        {
+            int cmp = Longs.compare(this.segment, that.segment);
+            return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
+        }
+
+        int serializedSize()
+        {
+            return computeUnsignedVIntSize(segment) + 
computeUnsignedVIntSize(position);
+        }
+
+        void serialize(DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt(segment);
+            out.writeUnsignedVInt32(position);
+        }
+
+        static Pointer deserialize(DataInputPlus in) throws IOException
+        {
+            long segment = in.readUnsignedVInt();
+            int position = in.readUnsignedVInt32();
+            return new Pointer(segment, position);
+        }
+    }
+
+    private class JournalCallbacks implements AsyncCallbacks<Key, Object>
+    {
+        @Override
+        public void onWrite(long segment, int position, int size, Key key, 
Object value, Object writeContext)

Review Comment:
   Addressed, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to