dcapwell commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1066356417


##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent 
event)
         {
             logger.warn("Unexpected exception handling stream event", t);
         }
-        sessions = Sessions.create(streamProgress.values());
         lastUpdatedAtNanos = Clock.Global.nanoTime();
     }
 
     private void streamPrepared(StreamEvent.SessionPreparedEvent event)
     {
-        SessionInfo session = new SessionInfo(event.session);
-        streamProgress.putIfAbsent(session.peer, session);
+        SessionInfo session = event.session;
+        peers.add(session.peer);
+        // only update stats on ACK to avoid duplication
+        if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+            return;
+        sessions.bytesToReceive += session.getTotalSizeToReceive();
+        sessions.bytesToSend += session.getTotalSizeToSend();
+
+        sessions.filesToReceive += session.getTotalFilesToReceive();
+        sessions.filesToSend += session.getTotalFilesToSend();
     }
 
     private void streamProgress(StreamEvent.ProgressEvent event)
     {
-        SessionInfo info = streamProgress.get(event.progress.peer);
-        if (info != null)
+        ProgressInfo info = event.progress;
+        Pair<InetAddressAndPort, String> key = Pair.create(info.peer, 
info.fileName);
+        long seenBytes = activeFiles.getOrDefault(key, 0);
+        long delta = info.currentBytes - seenBytes;
+        if (info.direction == ProgressInfo.Direction.IN)
         {
-            info.updateProgress(event.progress);
+            // receiving
+            sessions.bytesReceived += delta;
+            if (info.isCompleted())
+                sessions.filesReceived++;
         }
         else
         {
-            logger.warn("[Stream #{}} ID#{}] Recieved stream progress before 
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+            // sending
+            sessions.bytesSent += delta;
+            if (info.isCompleted())
+                sessions.filesSent++;
         }
+        activeFiles.put(key, info.currentBytes);

Review Comment:
   > Is it ok that activeFiles gradually grows to the total size
   
   nope, I forgot to `clear` on success/failure... I used to in the old buffer 
state but forgot after changing... made such a change...
   
   now, if we should remove when `info.isComplete()` I guess this depends on 
how defensive we want to be on duplicate events... if we get duplicate events 
`delta` should be 0 so we wouldn't update those metrics but could double count 
files...
   
   I am in favor of clearing the key when complete as its not enough to block 
such corruption



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to