dcapwell commented on code in PR #2104:
URL: https://github.com/apache/cassandra/pull/2104#discussion_r1110195031


##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -260,10 +269,20 @@ public StreamManager()
         logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);

Review Comment:
   please cleanup `numElements` as you made this dead code



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -252,6 +254,13 @@ public void onRegister(StreamResultFuture result)
         }
     };
 
+    protected void readdStreamingState(StreamingState state)

Review Comment:
   this name is confusing, this looks like you want `readd`?



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -252,6 +254,13 @@ public void onRegister(StreamResultFuture result)
         }
     };
 
+    protected void readdStreamingState(StreamingState state)
+    {
+        if (!DatabaseDescriptor.getStreamingStatsEnabled())

Review Comment:
   I wonder about this, we know its already in the cache so why not update?



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -260,10 +269,20 @@ public StreamManager()
         logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);
         states = CacheBuilder.newBuilder()
                              .expireAfterWrite(duration.quantity(), 
duration.unit())
-                             .maximumSize(numElements)
+                             .maximumWeight(10 * 1024 *1024)
+                             .weigher(new streamingStateWeigher())
                              .build();
     }
 
+    private class streamingStateWeigher implements 
Weigher<TimeUUID,StreamingState>

Review Comment:
   classes should be upper case for the first letter in the name.
   
   Also you should use `static` when you don't need access to 
`StreamManager.this`



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -260,10 +269,20 @@ public StreamManager()
         logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);
         states = CacheBuilder.newBuilder()
                              .expireAfterWrite(duration.quantity(), 
duration.unit())
-                             .maximumSize(numElements)
+                             .maximumWeight(10 * 1024 *1024)
+                             .weigher(new streamingStateWeigher())
                              .build();
     }
 
+    private class streamingStateWeigher implements 
Weigher<TimeUUID,StreamingState>
+    {
+        public int weigh(TimeUUID key, StreamingState val) {

Review Comment:
   you need `@Override`



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -260,10 +269,20 @@ public StreamManager()
         logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);
         states = CacheBuilder.newBuilder()
                              .expireAfterWrite(duration.quantity(), 
duration.unit())
-                             .maximumSize(numElements)
+                             .maximumWeight(10 * 1024 *1024)
+                             .weigher(new streamingStateWeigher())
                              .build();
     }
 
+    private class streamingStateWeigher implements 
Weigher<TimeUUID,StreamingState>
+    {
+        public int weigh(TimeUUID key, StreamingState val) {
+            long weight = ObjectSizes.measureDeep(val);

Review Comment:
   performance issue, this is a costly operation which is why we don't use it 
outside of static access.  You should look at the "empty" size we already 
computed and compute the "change" from empty.



##########
src/java/org/apache/cassandra/streaming/StreamManager.java:
##########
@@ -260,10 +269,20 @@ public StreamManager()
         logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);
         states = CacheBuilder.newBuilder()
                              .expireAfterWrite(duration.quantity(), 
duration.unit())
-                             .maximumSize(numElements)
+                             .maximumWeight(10 * 1024 *1024)
+                             .weigher(new streamingStateWeigher())
                              .build();
     }
 
+    private class streamingStateWeigher implements 
Weigher<TimeUUID,StreamingState>
+    {
+        public int weigh(TimeUUID key, StreamingState val) {
+            long weight = ObjectSizes.measureDeep(val);
+            int finalWeight = Math.toIntExact(weight);

Review Comment:
   don't save, just return



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -283,6 +283,9 @@ public synchronized void onFailure(Throwable throwable)
     {
         completeMessage = Throwables.getStackTraceAsString(throwable);
         updateState(Status.FAILURE);
+        //we know the size is now very different from the estimate so 
recompute by adding again
+        StreamManager.instance.readdStreamingState(this);
+        //update cache

Review Comment:
   you can remove, your comment above covers this



##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -701,7 +701,7 @@ public Future<?> onError(Throwable e)
             state(State.FAILED); // make sure subsequent error handling sees 
the session in a final state 
             channel.sendControlMessage(new 
SessionFailedMessage()).awaitUninterruptibly();
         }
-        return closeSession(State.FAILED, "Failed because of an exception that 
is not an EOF exception;\n" + Throwables.getStackTraceAsString(e));
+        return closeSession(State.FAILED, "Failed because of an unkown 
exception\n" + Throwables.getStackTraceAsString(e));

Review Comment:
   you still should try to bound the throwable, or is that another patch coming?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to