jonmeredith commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1066439309


##########
src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java:
##########
@@ -89,11 +89,15 @@ public SSTableMultiWriter read(DataInputPlus inputPlus) 
throws Throwable
                 cis.position(section.lowerPosition);
                 in.reset(0);
 
+                long lastBytesRead = 0;
                 while (in.getBytesRead() < sectionLength)
                 {
                     writePartition(deserializer, writer);
                     // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(filename + '-' + fileSeqNum, 
ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
+                    long bytesRead = cis.chunkBytesRead();
+                    long bytesDelta = bytesRead - lastBytesRead;
+                    lastBytesRead = bytesRead;
+                    session.progress(filename + '-' + fileSeqNum, 
ProgressInfo.Direction.IN, bytesRead, bytesDelta, totalSize);

Review Comment:
   Could pull `filename + '-' + fileSeqNum` out of the while loop too rather 
than hope the compiler is smart enough. Looks true for most of the other call 
points too.



##########
src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java:
##########
@@ -113,7 +113,7 @@ public void write(StreamingDataOutputPlus out) throws 
IOException
                     start += lastBytesRead;
                     bytesRead += lastBytesRead;
                     progress += (lastBytesRead - transferOffset);
-                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
+                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, lastBytesRead - transferOffset, 
totalSize);

Review Comment:
   Could DRY it up 
   ```
   long delta = lastBytesRead - transferOffset;
   progress += delta;
   session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, delta, totalSize);
   ```



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +237,59 @@ public synchronized void handleStreamEvent(StreamEvent 
event)
         {
             logger.warn("Unexpected exception handling stream event", t);
         }
-        sessions = Sessions.create(streamProgress.values());
         lastUpdatedAtNanos = Clock.Global.nanoTime();
     }
 
     private void streamPrepared(StreamEvent.SessionPreparedEvent event)
     {
-        SessionInfo session = new SessionInfo(event.session);
-        streamProgress.putIfAbsent(session.peer, session);
+        SessionInfo session = event.session;
+        peers.add(session.peer);
+        // only update stats on ACK to avoid duplication
+        if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+            return;
+        sessions.bytesToReceive += session.getTotalSizeToReceive();
+        sessions.bytesToSend += session.getTotalSizeToSend();
+
+        sessions.filesToReceive += session.getTotalFilesToReceive();
+        sessions.filesToSend += session.getTotalFilesToSend();
     }
 
     private void streamProgress(StreamEvent.ProgressEvent event)
     {
-        SessionInfo info = streamProgress.get(event.progress.peer);
-        if (info != null)
+        ProgressInfo info = event.progress;
+
+        long delta = info.deltaBytes;
+        if (delta < 0)
         {
-            info.updateProgress(event.progress);
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES,
+                             "[id={}, key={}] Stream event reported a negative 
delta ({})",
+                             this.id, Pair.create(info.peer, info.fileName), 
delta);

Review Comment:
   Not sure this is still helpful - if you keep, you can remove the 
Pair.create/import and just log the two components directly. 
   
   Could also move to the ProgressInfo constructor as it should never be 
created with a negative delta.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to