jonmeredith commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1065229104
##########
test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java:
##########
@@ -37,18 +38,21 @@ public class RebuildStreamingTest extends TestBaseImpl
@Test
public void test() throws IOException
{
+ ByteBuffer blob = ByteBuffer.wrap(new byte[1 << 16]);
try (Cluster cluster = init(Cluster.build(2)
.withConfig(c ->
c.with(Feature.values()).set("stream_entire_sstables", false))
.start()))
{
Review Comment:
Can we make this a parametrized tests and check with entire sstable
streaming active too? The only change you have to make is the expected number
of files increases by 7x in the assertion as entire sstable streaming counts
each component it sends. Alternately we could modify the accounting to only
increment the file count on the DATA file?
##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent
event)
{
logger.warn("Unexpected exception handling stream event", t);
}
- sessions = Sessions.create(streamProgress.values());
lastUpdatedAtNanos = Clock.Global.nanoTime();
}
private void streamPrepared(StreamEvent.SessionPreparedEvent event)
{
- SessionInfo session = new SessionInfo(event.session);
- streamProgress.putIfAbsent(session.peer, session);
+ SessionInfo session = event.session;
+ peers.add(session.peer);
+ // only update stats on ACK to avoid duplication
+ if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+ return;
+ sessions.bytesToReceive += session.getTotalSizeToReceive();
+ sessions.bytesToSend += session.getTotalSizeToSend();
+
+ sessions.filesToReceive += session.getTotalFilesToReceive();
+ sessions.filesToSend += session.getTotalFilesToSend();
}
private void streamProgress(StreamEvent.ProgressEvent event)
{
- SessionInfo info = streamProgress.get(event.progress.peer);
- if (info != null)
+ ProgressInfo info = event.progress;
+ Pair<InetAddressAndPort, String> key = Pair.create(info.peer,
info.fileName);
+ long seenBytes = activeFiles.getOrDefault(key, 0);
+ long delta = info.currentBytes - seenBytes;
+ if (info.direction == ProgressInfo.Direction.IN)
{
- info.updateProgress(event.progress);
+ // receiving
+ sessions.bytesReceived += delta;
+ if (info.isCompleted())
+ sessions.filesReceived++;
}
else
{
- logger.warn("[Stream #{}} ID#{}] Recieved stream progress before
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+ // sending
+ sessions.bytesSent += delta;
+ if (info.isCompleted())
+ sessions.filesSent++;
}
+ activeFiles.put(key, info.currentBytes);
Review Comment:
Is it ok that `activeFiles` gradually grows to the total size of streams
transferred and includes completed files once they are transferred. I suppose
we need this if we want to check for non-negative deltas (or even better
positive, but I'm not sure that's guaranteed).
##########
src/java/org/apache/cassandra/streaming/StreamResultFuture.java:
##########
@@ -219,6 +226,10 @@ synchronized void fireStreamEvent(StreamEvent event)
logger.warn("Unexpected exception in listern while calling
handleStreamEvent", t);
}
}
+ long totalNanos = nanoTime() - startNanos;
+ if (totalNanos > slowEventsLogTimeoutNanos)
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}",
Review Comment:
nit: include unit of nanos in the message? Very low level nit as anybody
investigating thing will be able to work it out from code.
##########
test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java:
##########
@@ -37,18 +38,21 @@ public class RebuildStreamingTest extends TestBaseImpl
@Test
public void test() throws IOException
{
+ ByteBuffer blob = ByteBuffer.wrap(new byte[1 << 16]);
try (Cluster cluster = init(Cluster.build(2)
.withConfig(c ->
c.with(Feature.values()).set("stream_entire_sstables", false))
.start()))
{
Review Comment:
also, is it worth lowering `streaming_slow_events_timeout` so we check that
messages get logged if slow?
##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent
event)
{
logger.warn("Unexpected exception handling stream event", t);
}
- sessions = Sessions.create(streamProgress.values());
lastUpdatedAtNanos = Clock.Global.nanoTime();
}
private void streamPrepared(StreamEvent.SessionPreparedEvent event)
{
- SessionInfo session = new SessionInfo(event.session);
- streamProgress.putIfAbsent(session.peer, session);
+ SessionInfo session = event.session;
+ peers.add(session.peer);
+ // only update stats on ACK to avoid duplication
+ if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+ return;
+ sessions.bytesToReceive += session.getTotalSizeToReceive();
+ sessions.bytesToSend += session.getTotalSizeToSend();
+
+ sessions.filesToReceive += session.getTotalFilesToReceive();
+ sessions.filesToSend += session.getTotalFilesToSend();
}
private void streamProgress(StreamEvent.ProgressEvent event)
{
- SessionInfo info = streamProgress.get(event.progress.peer);
- if (info != null)
+ ProgressInfo info = event.progress;
+ Pair<InetAddressAndPort, String> key = Pair.create(info.peer,
info.fileName);
+ long seenBytes = activeFiles.getOrDefault(key, 0);
+ long delta = info.currentBytes - seenBytes;
Review Comment:
nit: is it worth a check for a non-negative delta?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]