dcapwell commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1066356417
##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent
event)
{
logger.warn("Unexpected exception handling stream event", t);
}
- sessions = Sessions.create(streamProgress.values());
lastUpdatedAtNanos = Clock.Global.nanoTime();
}
private void streamPrepared(StreamEvent.SessionPreparedEvent event)
{
- SessionInfo session = new SessionInfo(event.session);
- streamProgress.putIfAbsent(session.peer, session);
+ SessionInfo session = event.session;
+ peers.add(session.peer);
+ // only update stats on ACK to avoid duplication
+ if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+ return;
+ sessions.bytesToReceive += session.getTotalSizeToReceive();
+ sessions.bytesToSend += session.getTotalSizeToSend();
+
+ sessions.filesToReceive += session.getTotalFilesToReceive();
+ sessions.filesToSend += session.getTotalFilesToSend();
}
private void streamProgress(StreamEvent.ProgressEvent event)
{
- SessionInfo info = streamProgress.get(event.progress.peer);
- if (info != null)
+ ProgressInfo info = event.progress;
+ Pair<InetAddressAndPort, String> key = Pair.create(info.peer,
info.fileName);
+ long seenBytes = activeFiles.getOrDefault(key, 0);
+ long delta = info.currentBytes - seenBytes;
+ if (info.direction == ProgressInfo.Direction.IN)
{
- info.updateProgress(event.progress);
+ // receiving
+ sessions.bytesReceived += delta;
+ if (info.isCompleted())
+ sessions.filesReceived++;
}
else
{
- logger.warn("[Stream #{}} ID#{}] Recieved stream progress before
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+ // sending
+ sessions.bytesSent += delta;
+ if (info.isCompleted())
+ sessions.filesSent++;
}
+ activeFiles.put(key, info.currentBytes);
Review Comment:
> Is it ok that activeFiles gradually grows to the total size
nope, I forgot to `clear` on success/failure... I used to in the old buffer
state but forgot after changing... made such a change...
now, if we should remove when `info.isComplete()` I guess this depends on
how defensive we want to be on duplicate events... if we get duplicate events
`delta` should be 0 so we wouldn't update those metrics but could double count
files...
I am in favor of clearing the key when complete as its not enough to block
such corruption
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]