maedhroz commented on a change in pull request #1180:
URL: https://github.com/apache/cassandra/pull/1180#discussion_r701384298



##########
File path: src/java/org/apache/cassandra/service/reads/ReadCallback.java
##########
@@ -56,29 +59,104 @@
 public class ReadCallback<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>> implements RequestCallback<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( 
ReadCallback.class );
-    private class WarningCounter
+
+    private class WarnAbortCounter
     {
-        // the highest number of tombstones reported by a node's warning
-        final AtomicInteger tombstoneWarnings = new AtomicInteger();
-        final AtomicInteger maxTombstoneWarningCount = new AtomicInteger();
-        // the highest number of tombstones reported by a node's rejection. 
This should be the same as
-        // our configured limit, but including to aid in diagnosing 
misconfigurations
-        final AtomicInteger tombstoneAborts = new AtomicInteger();
-        final AtomicInteger maxTombstoneAbortsCount = new AtomicInteger();
-
-        // TODO: take message as arg and return boolean for 'had warning' etc
-        void addTombstoneWarning(InetAddressAndPort from, int tombstones)
+        final AtomicInteger warnings = new AtomicInteger();
+        // the highest number reported by a node's warning
+        final AtomicLong maxWarningCount = new AtomicLong();
+
+        final AtomicInteger aborts = new AtomicInteger();
+        // the highest number reported by a node's rejection.
+        final AtomicLong maxAbortsCount = new AtomicLong();
+
+        void addWarning(InetAddressAndPort from, long value)
         {
             if (!waitingFor(from)) return;
-            tombstoneWarnings.incrementAndGet();
-            maxTombstoneWarningCount.accumulateAndGet(tombstones, Math::max);
+            warnings.incrementAndGet();
+            maxWarningCount.accumulateAndGet(value, Math::max);
         }
 
-        void addTombstoneAbort(InetAddressAndPort from, int tombstones)
+        void addAbort(InetAddressAndPort from, long value)
         {
             if (!waitingFor(from)) return;
-            tombstoneAborts.incrementAndGet();
-            maxTombstoneAbortsCount.accumulateAndGet(tombstones, Math::max);
+            aborts.incrementAndGet();
+            maxAbortsCount.accumulateAndGet(value, Math::max);
+        }
+    }
+
+    private interface ToString
+    {
+        String apply(int count, long value, String cql);
+    }
+
+    private class WarningCounter
+    {
+        final WarnAbortCounter tombstones = new WarnAbortCounter();
+        final WarnAbortCounter localReadSizeTooLarge = new WarnAbortCounter();
+        final WarnAbortCounter rowIndexTooLarge = new WarnAbortCounter();
+
+        private String cql, loggableTokens;
+        private String cql()
+        {
+            if (cql == null)
+                cql = command.toCQLString();
+            return cql;
+        }
+        private String loggableTokens()
+        {
+            if (loggableTokens == null)
+                loggableTokens = command.loggableTokens();
+            return loggableTokens;
+        }
+
+        private void trackAborts(WarnAbortCounter counter, 
TableMetrics.TableMeter metric, ToString toString)
+        {
+            if (counter.aborts.get() > 0)
+            {
+                String msg = toString.apply(counter.aborts.get(), 
counter.maxAbortsCount.get(), cql());
+                ClientWarn.instance.warn(msg + " with " + loggableTokens());
+                logger.warn(msg);
+                metric.mark();
+            }
+        }
+
+        private void trackWarnings(WarnAbortCounter counter, 
TableMetrics.TableMeter metric, ToString toString)
+        {
+            if (counter.warnings.get() > 0)
+            {
+                String msg = toString.apply(counter.warnings.get(), 
counter.maxWarningCount.get(), cql());
+                ClientWarn.instance.warn(msg + " with " + loggableTokens());
+                logger.warn(msg);
+                metric.mark();
+            }
+        }
+
+        void track()
+        {
+            trackAborts(  tombstones, cfs().metric.clientTombstoneAborts,   
ReadCallback::tombstoneAbortMessage);
+            trackWarnings(tombstones, cfs().metric.clientTombstoneWarnings, 
ReadCallback::tombstoneWarnMessage);
+
+            trackAborts(  localReadSizeTooLarge, 
cfs().metric.clientLocalReadSizeTooLargeAborts,   
ReadCallback::localReadSizeTooLargeAbortMessage);
+            trackWarnings(localReadSizeTooLarge, 
cfs().metric.clientLocalReadSizeTooLargeWarnings, 
ReadCallback::localReadSizeTooLargeWarnMessage);
+
+            trackAborts(rowIndexTooLarge, 
cfs().metric.rowIndexSizeTooLargeAborts, 
ReadCallback::rowIndexTooLargeAbortMessage);
+            trackWarnings(rowIndexTooLarge, 
cfs().metric.rowIndexSizeTooLargeWarnings, 
ReadCallback::rowIndexTooLargeWarnMessage);
+        }
+
+        void mayAbort(int received)
+        {
+            if (tombstones.aborts.get() > 0)
+                throw new TombstoneAbortException(tombstones.aborts.get(), 
Math.toIntExact(tombstones.maxAbortsCount.get()), cql(), 
resolver.isDataPresent(),

Review comment:
       ```suggestion
                   throw new TombstoneAbortException(tombstones.aborts.get(), 
tombstones.maxAbortsCount.get(), cql(), resolver.isDataPresent(),
   ```
   ...assuming we change the argument type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to