smiklosovic commented on code in PR #2058:
URL: https://github.com/apache/cassandra/pull/2058#discussion_r1061550568


##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -1027,9 +1022,30 @@ public void receive(IncomingStreamMessage message)
     public void progress(String filename, ProgressInfo.Direction direction, 
long bytes, long total)
     {
         ProgressInfo progress = new ProgressInfo(peer, index, filename, 
direction, bytes, total);
+        updateMetricsOnProgress(progress);
         streamResult.handleProgress(progress);
     }
 
+    private void updateMetricsOnProgress(ProgressInfo progress)
+    {
+        ProgressInfo.Direction direction = progress.direction;
+        long lastSeenBytesStreamedForProgress = 
lastSeenBytesStreamed.getOrDefault(progress, 0L);
+        long newBytesStreamed = progress.currentBytes - 
lastSeenBytesStreamedForProgress;
+        if (direction == ProgressInfo.Direction.OUT)
+        {
+            StreamingMetrics.totalOutgoingBytes.inc(newBytesStreamed);
+            metrics.outgoingBytes.inc(newBytesStreamed);
+        }
+

Review Comment:
   nit: empty line redundant



##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -1027,9 +1022,30 @@ public void receive(IncomingStreamMessage message)
     public void progress(String filename, ProgressInfo.Direction direction, 
long bytes, long total)
     {
         ProgressInfo progress = new ProgressInfo(peer, index, filename, 
direction, bytes, total);
+        updateMetricsOnProgress(progress);
         streamResult.handleProgress(progress);
     }
 
+    private void updateMetricsOnProgress(ProgressInfo progress)
+    {
+        ProgressInfo.Direction direction = progress.direction;
+        long lastSeenBytesStreamedForProgress = 
lastSeenBytesStreamed.getOrDefault(progress, 0L);

Review Comment:
   does this mean that `lastSeenBytesStreamed` map will be ever growing only as 
I dont see any place where we are removing it from that map?



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void 
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void 
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws 
Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)

Review Comment:
   nit: `try (Cluster` - there should be space.



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void 
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void 
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws 
Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> 
config.with(NETWORK, GOSSIP)
+                                                                      
.set("stream_entire_sstables", false)
+                                                                      
.set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> 
cluster.get(2).nodetool("repair", "--full"));
+        }
+    }
+
+    @Test
+    public void 
testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws 
Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> 
config.with(NETWORK, GOSSIP)
+                                                                      
.set("stream_entire_sstables", false)
+                                                                      
.set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> 
cluster.get(2).nodetool("rebuild"));
+        }
+    }
+
+    /**
+     * Test to verify that streaming metrics are updated incrementally
+     * - Create 2 node cluster with RF=2
+     * - Create 1 sstable with 10MB on node1, while node2 is empty due to 
message drop
+     * - Run repair OR rebuild on node2 to transfer sstable from node1
+     * - Collect metrics during streaming and check that at least 3 different 
values are reported [0, partial1, .., final_size]
+     * - Check final transferred size is correct (~10MB bytes)
+     */
+    public void runStreamingOperationAndCheckIncrementalMetrics(Cluster 
cluster, Callable<Integer> streamingOperation) throws Exception
+    {
+        assertThat(cluster.size())
+            .describedAs("The minimum cluster size to check streaming metrics 
is 2 nodes.")
+            .isEqualTo(2);
+
+        // Create table with compression disabled so we can easily compute the 
expected final sstable size
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text PRIMARY 
KEY, c1 text) " +
+                                           "WITH compaction = {'class': '%s', 
'enabled': 'false'} " +
+                                           "AND compression = 
{'enabled':'false'};",
+                                           KEYSPACE, 
"SizeTieredCompactionStrategy"));
+
+        // each row has 1KB payload
+        Random random = new Random(0);
+        StringBuilder random1kbString = new StringBuilder();
+        for (int i = 0; i < 1024; i++)
+            random1kbString.append((char)random.nextInt(127));
+
+        // Drop all messages from node1 to node2 so node2 will be empty
+        IMessageFilters.Filter drop1to2 = 
cluster.filters().verbs(MUTATION_REQ.id).from(1).to(2).drop();
+
+        final int totalRows = 10000; // total size: 10K x 1KB ~= 10MB
+        for (int i = 0; i < totalRows; ++i)
+        {
+            // write rows with timestamp 1 to have deterministic transfer size
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, 
c1) VALUES (?, ?) USING TIMESTAMP 1;"),
+                                           ConsistencyLevel.ONE,
+                                           Integer.toString(i),
+                                           random1kbString.toString());
+        }
+
+        // Flush and compact all nodes to generate a single sstable
+        cluster.forEach(i -> {
+            i.flush(KEYSPACE);
+            i.forceCompact(KEYSPACE, "cf");
+        });
+
+        // Check that node 1 only has 1 sstable after flush + compaction
+        assertThat(getNumberOfSSTables(cluster, 1)).isEqualTo(1);
+        // Node 2 should have 0 sstables since messages from node1 were dropped
+        assertThat(getNumberOfSSTables(cluster, 2)).isEqualTo(0);
+
+        // Disable dropping of messages from node1 to node2
+        drop1to2.off();
+
+        ExecutorService nodetoolExecutor = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

Review Comment:
   should not we shutdown this executor after we are done?



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void 
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void 
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws 
Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> 
config.with(NETWORK, GOSSIP)
+                                                                      
.set("stream_entire_sstables", false)
+                                                                      
.set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> 
cluster.get(2).nodetool("repair", "--full"));
+        }
+    }
+
+    @Test
+    public void 
testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws 
Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> 
config.with(NETWORK, GOSSIP)
+                                                                      
.set("stream_entire_sstables", false)
+                                                                      
.set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> 
cluster.get(2).nodetool("rebuild"));
+        }
+    }
+
+    /**
+     * Test to verify that streaming metrics are updated incrementally
+     * - Create 2 node cluster with RF=2
+     * - Create 1 sstable with 10MB on node1, while node2 is empty due to 
message drop
+     * - Run repair OR rebuild on node2 to transfer sstable from node1
+     * - Collect metrics during streaming and check that at least 3 different 
values are reported [0, partial1, .., final_size]
+     * - Check final transferred size is correct (~10MB bytes)
+     */
+    public void runStreamingOperationAndCheckIncrementalMetrics(Cluster 
cluster, Callable<Integer> streamingOperation) throws Exception
+    {
+        assertThat(cluster.size())
+            .describedAs("The minimum cluster size to check streaming metrics 
is 2 nodes.")
+            .isEqualTo(2);
+
+        // Create table with compression disabled so we can easily compute the 
expected final sstable size
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text PRIMARY 
KEY, c1 text) " +
+                                           "WITH compaction = {'class': '%s', 
'enabled': 'false'} " +
+                                           "AND compression = 
{'enabled':'false'};",
+                                           KEYSPACE, 
"SizeTieredCompactionStrategy"));

Review Comment:
   I fail to see why there is a placeholder for `class` when it is a constant. 
Why is it extracted?



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void 
testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void 
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws 
Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> 
config.with(NETWORK, GOSSIP)
+                                                                      
.set("stream_entire_sstables", false)
+                                                                      
.set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> 
cluster.get(2).nodetool("repair", "--full"));
+        }
+    }
+
+    @Test
+    public void 
testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws 
Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)

Review Comment:
   nit: `try (Cluster` - there should be space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to