dcapwell commented on code in PR #2104:
URL: https://github.com/apache/cassandra/pull/2104#discussion_r1120899953


##########
src/java/org/apache/cassandra/locator/InetAddressAndPort.java:
##########
@@ -41,6 +41,7 @@
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.ObjectSizes;

Review Comment:
   your build is failing 
https://app.circleci.com/pipelines/github/NateAdere/cassandra/77/workflows/1c5a9c50-d8b0-4f34-91a2-f591816b86d9/jobs/1733



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -39,11 +40,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.DurationSpec;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
 import org.apache.cassandra.streaming.management.StreamStateCompositeData;
+import org.apache.cassandra.utils.ObjectSizes;

Review Comment:
   your build is failing



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -252,18 +255,36 @@ public void onRegister(StreamResultFuture result)
         }
     };
 
+    protected void addStreamingStateAgain(StreamingState state)
+    {
+        if (!DatabaseDescriptor.getStreamingStatsEnabled())
+            return;
+        states.put(state.id(), state);
+    }
+
     public StreamManager()
     {
         DurationSpec.LongNanosecondsBound duration = 
DatabaseDescriptor.getStreamingStateExpires();
         long sizeBytes = DatabaseDescriptor.getStreamingStateSize().toBytes();
-        long numElements = sizeBytes / StreamingState.ELEMENT_SIZE;
-        logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);
+        logger.info("Storing streaming state for {} or for weight {}", 
duration, sizeBytes);

Review Comment:
   replace `weight` with `size`



##########
src/java/org/apache/cassandra/utils/ObjectSizes.java:
##########
@@ -39,6 +42,8 @@
 
     private static final long DIRECT_BUFFER_HEAP_SIZE = 
measure(ByteBuffer.allocateDirect(0));
 
+    public static final long IPV6_SIZE = ObjectSizes.measureDeep(new 
InetSocketAddress(getIpvAddress(16), 42));

Review Comment:
   should be something like `IPV6_SOCKET_ADDRESS_SIZE`, your name doesn't 
include the fact this includes port



##########
src/java/org/apache/cassandra/utils/ObjectSizes.java:
##########
@@ -236,4 +241,20 @@ public static long measure(Object pojo)
     {
         return meter.measure(pojo);
     }
+
+    public static InetAddress getIpvAddress(int size)

Review Comment:
   private, this is internal use only



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -39,11 +40,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.IMeasurableMemory;

Review Comment:
   your build is failing 
https://app.circleci.com/pipelines/github/NateAdere/cassandra/77/workflows/1c5a9c50-d8b0-4f34-91a2-f591816b86d9/jobs/1733



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -34,7 +34,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.virtual.SimpleDataSet;
+import org.apache.cassandra.locator.InetAddressAndPort;

Review Comment:
   your build is failing 
https://app.circleci.com/pipelines/github/NateAdere/cassandra/77/workflows/1c5a9c50-d8b0-4f34-91a2-f591816b86d9/jobs/1733



##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -694,8 +701,8 @@ public Future<?> onError(Throwable e)
             state(State.FAILED); // make sure subsequent error handling sees 
the session in a final state 
             channel.sendControlMessage(new 
SessionFailedMessage()).awaitUninterruptibly();
         }
-
-        return closeSession(State.FAILED);
+        String exception = Throwables.getStackTraceAsString(e);
+        return closeSession(State.FAILED, "Failed because of an unkown 
exception\n" + exception.substring(0,exception.length() -143)); //bound the 
exception by removing internal line logs

Review Comment:
   I don't feel this address my feedback... I was asking that you define a 
solution to bound memory that you could justify; can you justify why this 
solution?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to