dcapwell commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1066462771
##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +237,59 @@ public synchronized void handleStreamEvent(StreamEvent
event)
{
logger.warn("Unexpected exception handling stream event", t);
}
- sessions = Sessions.create(streamProgress.values());
lastUpdatedAtNanos = Clock.Global.nanoTime();
}
private void streamPrepared(StreamEvent.SessionPreparedEvent event)
{
- SessionInfo session = new SessionInfo(event.session);
- streamProgress.putIfAbsent(session.peer, session);
+ SessionInfo session = event.session;
+ peers.add(session.peer);
+ // only update stats on ACK to avoid duplication
+ if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+ return;
+ sessions.bytesToReceive += session.getTotalSizeToReceive();
+ sessions.bytesToSend += session.getTotalSizeToSend();
+
+ sessions.filesToReceive += session.getTotalFilesToReceive();
+ sessions.filesToSend += session.getTotalFilesToSend();
}
private void streamProgress(StreamEvent.ProgressEvent event)
{
- SessionInfo info = streamProgress.get(event.progress.peer);
- if (info != null)
+ ProgressInfo info = event.progress;
+
+ long delta = info.deltaBytes;
+ if (delta < 0)
{
- info.updateProgress(event.progress);
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES,
+ "[id={}, key={}] Stream event reported a negative
delta ({})",
+ this.id, Pair.create(info.peer, info.fileName),
delta);
Review Comment:
Moved to StreamSession.progress as its a better fit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]