smiklosovic commented on code in PR #2058:
URL: https://github.com/apache/cassandra/pull/2058#discussion_r1061550568
##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -1027,9 +1022,30 @@ public void receive(IncomingStreamMessage message)
public void progress(String filename, ProgressInfo.Direction direction,
long bytes, long total)
{
ProgressInfo progress = new ProgressInfo(peer, index, filename,
direction, bytes, total);
+ updateMetricsOnProgress(progress);
streamResult.handleProgress(progress);
}
+ private void updateMetricsOnProgress(ProgressInfo progress)
+ {
+ ProgressInfo.Direction direction = progress.direction;
+ long lastSeenBytesStreamedForProgress =
lastSeenBytesStreamed.getOrDefault(progress, 0L);
+ long newBytesStreamed = progress.currentBytes -
lastSeenBytesStreamedForProgress;
+ if (direction == ProgressInfo.Direction.OUT)
+ {
+ StreamingMetrics.totalOutgoingBytes.inc(newBytesStreamed);
+ metrics.outgoingBytes.inc(newBytesStreamed);
+ }
+
Review Comment:
nit: empty line redundant
##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -1027,9 +1022,30 @@ public void receive(IncomingStreamMessage message)
public void progress(String filename, ProgressInfo.Direction direction,
long bytes, long total)
{
ProgressInfo progress = new ProgressInfo(peer, index, filename,
direction, bytes, total);
+ updateMetricsOnProgress(progress);
streamResult.handleProgress(progress);
}
+ private void updateMetricsOnProgress(ProgressInfo progress)
+ {
+ ProgressInfo.Direction direction = progress.direction;
+ long lastSeenBytesStreamedForProgress =
lastSeenBytesStreamed.getOrDefault(progress, 0L);
Review Comment:
does this mean that `lastSeenBytesStreamed` map will be ever growing only as
I dont see any place where we are removing it from that map?
##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
testMetricsWithStreamingToTwoNodes(true);
}
- private int getNumberOfSSTables(Cluster cluster, int node) {
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
Review Comment:
nit: `try (Cluster` - there should be space.
##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
testMetricsWithStreamingToTwoNodes(true);
}
- private int getNumberOfSSTables(Cluster cluster, int node) {
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("stream_entire_sstables", false)
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ runStreamingOperationAndCheckIncrementalMetrics(cluster, () ->
cluster.get(2).nodetool("repair", "--full"));
+ }
+ }
+
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("stream_entire_sstables", false)
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ runStreamingOperationAndCheckIncrementalMetrics(cluster, () ->
cluster.get(2).nodetool("rebuild"));
+ }
+ }
+
+ /**
+ * Test to verify that streaming metrics are updated incrementally
+ * - Create 2 node cluster with RF=2
+ * - Create 1 sstable with 10MB on node1, while node2 is empty due to
message drop
+ * - Run repair OR rebuild on node2 to transfer sstable from node1
+ * - Collect metrics during streaming and check that at least 3 different
values are reported [0, partial1, .., final_size]
+ * - Check final transferred size is correct (~10MB bytes)
+ */
+ public void runStreamingOperationAndCheckIncrementalMetrics(Cluster
cluster, Callable<Integer> streamingOperation) throws Exception
+ {
+ assertThat(cluster.size())
+ .describedAs("The minimum cluster size to check streaming metrics
is 2 nodes.")
+ .isEqualTo(2);
+
+ // Create table with compression disabled so we can easily compute the
expected final sstable size
+ cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text PRIMARY
KEY, c1 text) " +
+ "WITH compaction = {'class': '%s',
'enabled': 'false'} " +
+ "AND compression =
{'enabled':'false'};",
+ KEYSPACE,
"SizeTieredCompactionStrategy"));
+
+ // each row has 1KB payload
+ Random random = new Random(0);
+ StringBuilder random1kbString = new StringBuilder();
+ for (int i = 0; i < 1024; i++)
+ random1kbString.append((char)random.nextInt(127));
+
+ // Drop all messages from node1 to node2 so node2 will be empty
+ IMessageFilters.Filter drop1to2 =
cluster.filters().verbs(MUTATION_REQ.id).from(1).to(2).drop();
+
+ final int totalRows = 10000; // total size: 10K x 1KB ~= 10MB
+ for (int i = 0; i < totalRows; ++i)
+ {
+ // write rows with timestamp 1 to have deterministic transfer size
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k,
c1) VALUES (?, ?) USING TIMESTAMP 1;"),
+ ConsistencyLevel.ONE,
+ Integer.toString(i),
+ random1kbString.toString());
+ }
+
+ // Flush and compact all nodes to generate a single sstable
+ cluster.forEach(i -> {
+ i.flush(KEYSPACE);
+ i.forceCompact(KEYSPACE, "cf");
+ });
+
+ // Check that node 1 only has 1 sstable after flush + compaction
+ assertThat(getNumberOfSSTables(cluster, 1)).isEqualTo(1);
+ // Node 2 should have 0 sstables since messages from node1 were dropped
+ assertThat(getNumberOfSSTables(cluster, 2)).isEqualTo(0);
+
+ // Disable dropping of messages from node1 to node2
+ drop1to2.off();
+
+ ExecutorService nodetoolExecutor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
Review Comment:
should not we shutdown this executor after we are done?
##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
testMetricsWithStreamingToTwoNodes(true);
}
- private int getNumberOfSSTables(Cluster cluster, int node) {
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("stream_entire_sstables", false)
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ runStreamingOperationAndCheckIncrementalMetrics(cluster, () ->
cluster.get(2).nodetool("repair", "--full"));
+ }
+ }
+
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("stream_entire_sstables", false)
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ runStreamingOperationAndCheckIncrementalMetrics(cluster, () ->
cluster.get(2).nodetool("rebuild"));
+ }
+ }
+
+ /**
+ * Test to verify that streaming metrics are updated incrementally
+ * - Create 2 node cluster with RF=2
+ * - Create 1 sstable with 10MB on node1, while node2 is empty due to
message drop
+ * - Run repair OR rebuild on node2 to transfer sstable from node1
+ * - Collect metrics during streaming and check that at least 3 different
values are reported [0, partial1, .., final_size]
+ * - Check final transferred size is correct (~10MB bytes)
+ */
+ public void runStreamingOperationAndCheckIncrementalMetrics(Cluster
cluster, Callable<Integer> streamingOperation) throws Exception
+ {
+ assertThat(cluster.size())
+ .describedAs("The minimum cluster size to check streaming metrics
is 2 nodes.")
+ .isEqualTo(2);
+
+ // Create table with compression disabled so we can easily compute the
expected final sstable size
+ cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text PRIMARY
KEY, c1 text) " +
+ "WITH compaction = {'class': '%s',
'enabled': 'false'} " +
+ "AND compression =
{'enabled':'false'};",
+ KEYSPACE,
"SizeTieredCompactionStrategy"));
Review Comment:
I fail to see why there is a placeholder for `class` when it is a constant.
Why is it extracted?
##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
testMetricsWithStreamingToTwoNodes(true);
}
- private int getNumberOfSSTables(Cluster cluster, int node) {
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("stream_entire_sstables", false)
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ runStreamingOperationAndCheckIncrementalMetrics(cluster, () ->
cluster.get(2).nodetool("repair", "--full"));
+ }
+ }
+
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
Review Comment:
nit: `try (Cluster` - there should be space.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]