jonmeredith commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1065229104


##########
test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java:
##########
@@ -37,18 +38,21 @@ public class RebuildStreamingTest extends TestBaseImpl
     @Test
     public void test() throws IOException
     {
+        ByteBuffer blob = ByteBuffer.wrap(new byte[1 << 16]);
         try (Cluster cluster = init(Cluster.build(2)
                                            .withConfig(c -> 
c.with(Feature.values()).set("stream_entire_sstables", false))
                                            .start()))
         {

Review Comment:
   Can we make this a parametrized tests and check with entire sstable 
streaming active too? The only change you have to make is the expected number 
of files increases by 7x in the assertion as entire sstable streaming counts 
each component it sends. Alternately we could modify the accounting to only 
increment the file count on the DATA file?



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent 
event)
         {
             logger.warn("Unexpected exception handling stream event", t);
         }
-        sessions = Sessions.create(streamProgress.values());
         lastUpdatedAtNanos = Clock.Global.nanoTime();
     }
 
     private void streamPrepared(StreamEvent.SessionPreparedEvent event)
     {
-        SessionInfo session = new SessionInfo(event.session);
-        streamProgress.putIfAbsent(session.peer, session);
+        SessionInfo session = event.session;
+        peers.add(session.peer);
+        // only update stats on ACK to avoid duplication
+        if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+            return;
+        sessions.bytesToReceive += session.getTotalSizeToReceive();
+        sessions.bytesToSend += session.getTotalSizeToSend();
+
+        sessions.filesToReceive += session.getTotalFilesToReceive();
+        sessions.filesToSend += session.getTotalFilesToSend();
     }
 
     private void streamProgress(StreamEvent.ProgressEvent event)
     {
-        SessionInfo info = streamProgress.get(event.progress.peer);
-        if (info != null)
+        ProgressInfo info = event.progress;
+        Pair<InetAddressAndPort, String> key = Pair.create(info.peer, 
info.fileName);
+        long seenBytes = activeFiles.getOrDefault(key, 0);
+        long delta = info.currentBytes - seenBytes;
+        if (info.direction == ProgressInfo.Direction.IN)
         {
-            info.updateProgress(event.progress);
+            // receiving
+            sessions.bytesReceived += delta;
+            if (info.isCompleted())
+                sessions.filesReceived++;
         }
         else
         {
-            logger.warn("[Stream #{}} ID#{}] Recieved stream progress before 
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+            // sending
+            sessions.bytesSent += delta;
+            if (info.isCompleted())
+                sessions.filesSent++;
         }
+        activeFiles.put(key, info.currentBytes);

Review Comment:
   Is it ok that `activeFiles` gradually grows to the total size of streams 
transferred and includes completed files once they are transferred. I suppose 
we need this if we want to check for non-negative deltas (or even better 
positive, but I'm not sure that's guaranteed).



##########
src/java/org/apache/cassandra/streaming/StreamResultFuture.java:
##########
@@ -219,6 +226,10 @@ synchronized void fireStreamEvent(StreamEvent event)
                 logger.warn("Unexpected exception in listern while calling 
handleStreamEvent", t);
             }
         }
+        long totalNanos = nanoTime() - startNanos;
+        if (totalNanos > slowEventsLogTimeoutNanos)
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}",

Review Comment:
   nit: include unit of nanos in the message?  Very low level nit as anybody 
investigating thing will be able to work it out from code.



##########
test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java:
##########
@@ -37,18 +38,21 @@ public class RebuildStreamingTest extends TestBaseImpl
     @Test
     public void test() throws IOException
     {
+        ByteBuffer blob = ByteBuffer.wrap(new byte[1 << 16]);
         try (Cluster cluster = init(Cluster.build(2)
                                            .withConfig(c -> 
c.with(Feature.values()).set("stream_entire_sstables", false))
                                            .start()))
         {

Review Comment:
   also, is it worth lowering `streaming_slow_events_timeout` so we check that 
messages get logged if slow?



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent 
event)
         {
             logger.warn("Unexpected exception handling stream event", t);
         }
-        sessions = Sessions.create(streamProgress.values());
         lastUpdatedAtNanos = Clock.Global.nanoTime();
     }
 
     private void streamPrepared(StreamEvent.SessionPreparedEvent event)
     {
-        SessionInfo session = new SessionInfo(event.session);
-        streamProgress.putIfAbsent(session.peer, session);
+        SessionInfo session = event.session;
+        peers.add(session.peer);
+        // only update stats on ACK to avoid duplication
+        if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+            return;
+        sessions.bytesToReceive += session.getTotalSizeToReceive();
+        sessions.bytesToSend += session.getTotalSizeToSend();
+
+        sessions.filesToReceive += session.getTotalFilesToReceive();
+        sessions.filesToSend += session.getTotalFilesToSend();
     }
 
     private void streamProgress(StreamEvent.ProgressEvent event)
     {
-        SessionInfo info = streamProgress.get(event.progress.peer);
-        if (info != null)
+        ProgressInfo info = event.progress;
+        Pair<InetAddressAndPort, String> key = Pair.create(info.peer, 
info.fileName);
+        long seenBytes = activeFiles.getOrDefault(key, 0);
+        long delta = info.currentBytes - seenBytes;

Review Comment:
   nit: is it worth a check for a non-negative delta?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to