ifesdjeen commented on code in PR #4466:
URL: https://github.com/apache/cassandra/pull/4466#discussion_r2522527644


##########
src/java/org/apache/cassandra/service/accord/AccordCache.java:
##########
@@ -1131,13 +1136,30 @@ public Object fullShrink(RoutingKey key, CommandsForKey 
value)
             if (value.isLoadingPruned())
                 return value;
 
-            return Serialize.toBytesWithoutKey(value.maximalPrune());
+            TxnId last = value.size() == 0 ? null : value.get(value.size() - 
1);
+            TxnId minUndecided = value.minUndecided();
+            int prefixBytes = (int) (  
CommandSerializers.txnId.serializedSize(last)
+                                     + 
CommandSerializers.txnId.serializedSize(minUndecided));
+            ByteBuffer result = Serialize.toBytesWithoutKey(prefixBytes, 
value.maximalPrune());
+            CommandSerializers.txnId.serialize(last, result);
+            CommandSerializers.txnId.serialize(minUndecided, result);
+            result.position(0);
+            return result;
+        }
+
+        private static int prefixBytes(ByteBuffer bb)
+        {
+            int prefix = (int) 
CommandSerializers.txnId.serializedSize(CommandSerializers.txnId.deserialize(bb,
 0));
+            prefix += 
(int)CommandSerializers.txnId.serializedSize(CommandSerializers.txnId.deserialize(bb,
 prefix));
+            return prefix;
         }
 
         @Override
         public CommandsForKey inflate(AccordCommandStore commandStore, 
RoutingKey key, Object shrunk)
         {
-            return Serialize.fromBytes(key, (ByteBuffer)shrunk);
+            ByteBuffer bb = (ByteBuffer)shrunk;
+            bb.position(prefixBytes(bb));

Review Comment:
   Should we duplicate here maybe? 



##########
src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java:
##########
@@ -106,64 +129,80 @@ public String toString()
         {
             throw new RuntimeException(e);
         }
-        builder.append("]");
+        builder.append(']');
         return builder.toString();
     }
 
     public static class Listener implements ReplicaEventListener
     {
-        private AccordReplicaMetrics forTransaction(TxnId txnId)
+        private SubShard forTransaction(SafeCommandStore safeStore, TxnId 
txnId)
         {
             if (txnId != null)
             {
+                Shard shard = ((AccordCommandStore) 
safeStore.commandStore()).executor().replicaMetrics;
                 if (txnId.isWrite())
-                    return writeMetrics;
+                    return shard.writes;
                 else if (txnId.isSomeRead())
-                    return readMetrics;
+                    return shard.reads;
+                else if (txnId.isSyncPoint())
+                    return shard.syncPoints;
             }
             return null;
         }
 
+        private static long unixNanos()
+        {
+            return currentTimeMillis() * 1_000_000;
+        }
+
+        private static long elapsed(TxnId txnId)
+        {
+            return elapsed(unixNanos(), txnId);
+        }
+
+        private static long elapsed(long unixNanos, TxnId txnId)
+        {
+            return Math.max(0, unixNanos - (txnId.hlc() * 1000));
+        }
+
+        private static LogLinearDecayingHistograms.Buffer 
buffer(SafeCommandStore safeStore)
+        {
+            return ((AccordSafeCommandStore) safeStore).histogramBuffer();
+        }
+
         @Override
         public void onStable(SafeCommandStore safeStore, Command cmd)
         {
             Tracing.trace("Stable {} on {}", cmd.txnId(), 
safeStore.commandStore());
-            long now = AccordTimeService.nowMicros();
-            AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+            SubShard metrics = forTransaction(safeStore, cmd.txnId());
             if (metrics != null)
-            {
-                long trxTimestamp = cmd.txnId().hlc();
-                metrics.stableLatency.update(now - trxTimestamp, 
TimeUnit.MICROSECONDS);
-            }
+                metrics.stableLatency.add(buffer(safeStore), 
elapsed(cmd.txnId()));
         }
 
         @Override
         public void onPreApplied(SafeCommandStore safeStore, Command cmd)
         {
             Tracing.trace("Preapplied {} on {}", cmd.txnId(), 
safeStore.commandStore());
-            long now = AccordTimeService.nowMicros();
-            AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+            SubShard metrics = forTransaction(safeStore, cmd.txnId());
             if (metrics != null)
             {
-                Timestamp trxTimestamp = cmd.txnId();
-                metrics.preapplyLatency.update(now - trxTimestamp.hlc(), 
TimeUnit.MICROSECONDS);
+                long elapsed = elapsed(cmd.txnId());
+                System.err.println("Recording preapply latency of " + elapsed 
+ " for " + cmd.txnId());

Review Comment:
   nit: should this be trace? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to