jonmeredith commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1066439309
##########
src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java:
##########
@@ -89,11 +89,15 @@ public SSTableMultiWriter read(DataInputPlus inputPlus)
throws Throwable
cis.position(section.lowerPosition);
in.reset(0);
+ long lastBytesRead = 0;
while (in.getBytesRead() < sectionLength)
{
writePartition(deserializer, writer);
// when compressed, report total bytes of compressed
chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(filename + '-' + fileSeqNum,
ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
+ long bytesRead = cis.chunkBytesRead();
+ long bytesDelta = bytesRead - lastBytesRead;
+ lastBytesRead = bytesRead;
+ session.progress(filename + '-' + fileSeqNum,
ProgressInfo.Direction.IN, bytesRead, bytesDelta, totalSize);
Review Comment:
Could pull `filename + '-' + fileSeqNum` out of the while loop too rather
than hope the compiler is smart enough. Looks true for most of the other call
points too.
##########
src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java:
##########
@@ -113,7 +113,7 @@ public void write(StreamingDataOutputPlus out) throws
IOException
start += lastBytesRead;
bytesRead += lastBytesRead;
progress += (lastBytesRead - transferOffset);
-
session.progress(sstable.descriptor.filenameFor(Component.DATA),
ProgressInfo.Direction.OUT, progress, totalSize);
+
session.progress(sstable.descriptor.filenameFor(Component.DATA),
ProgressInfo.Direction.OUT, progress, lastBytesRead - transferOffset,
totalSize);
Review Comment:
Could DRY it up
```
long delta = lastBytesRead - transferOffset;
progress += delta;
session.progress(sstable.descriptor.filenameFor(Component.DATA),
ProgressInfo.Direction.OUT, progress, delta, totalSize);
```
##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +237,59 @@ public synchronized void handleStreamEvent(StreamEvent
event)
{
logger.warn("Unexpected exception handling stream event", t);
}
- sessions = Sessions.create(streamProgress.values());
lastUpdatedAtNanos = Clock.Global.nanoTime();
}
private void streamPrepared(StreamEvent.SessionPreparedEvent event)
{
- SessionInfo session = new SessionInfo(event.session);
- streamProgress.putIfAbsent(session.peer, session);
+ SessionInfo session = event.session;
+ peers.add(session.peer);
+ // only update stats on ACK to avoid duplication
+ if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+ return;
+ sessions.bytesToReceive += session.getTotalSizeToReceive();
+ sessions.bytesToSend += session.getTotalSizeToSend();
+
+ sessions.filesToReceive += session.getTotalFilesToReceive();
+ sessions.filesToSend += session.getTotalFilesToSend();
}
private void streamProgress(StreamEvent.ProgressEvent event)
{
- SessionInfo info = streamProgress.get(event.progress.peer);
- if (info != null)
+ ProgressInfo info = event.progress;
+
+ long delta = info.deltaBytes;
+ if (delta < 0)
{
- info.updateProgress(event.progress);
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES,
+ "[id={}, key={}] Stream event reported a negative
delta ({})",
+ this.id, Pair.create(info.peer, info.fileName),
delta);
Review Comment:
Not sure this is still helpful - if you keep, you can remove the
Pair.create/import and just log the two components directly.
Could also move to the ProgressInfo constructor as it should never be
created with a negative delta.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]