isaacreath commented on code in PR #2058:
URL: https://github.com/apache/cassandra/pull/2058#discussion_r1052539712
##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,6 +151,92 @@ public void testMetricsWithRepairAndStreamingToTwoNodes()
throws Exception
testMetricsWithStreamingToTwoNodes(true);
}
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(3)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("stream_entire_sstables", false)
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ runStreamingOperationAndCheckIncrementalMetrics(cluster, () ->
cluster.get(3).nodetool("repair", "--full"));
+ }
+ }
+
+ @Test
+ public void
testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws
Exception
+ {
+ try(Cluster cluster = init(Cluster.build(3)
+ .withDataDirCount(1)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP)
+
.set("stream_entire_sstables", false)
+
.set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ runStreamingOperationAndCheckIncrementalMetrics(cluster, () ->
cluster.get(3).nodetool("rebuild"));
+ }
+ }
+ public void runStreamingOperationAndCheckIncrementalMetrics(Cluster
cluster, Callable<Integer> streamingOperation) throws Exception
+ {
+ assertThat(cluster.size())
+ .describedAs("The minimum cluster size to check streaming metrics
is 3 nodes.")
+ .isEqualTo(3);
+
+ cluster.forEach(i -> i.runOnInstance(() ->
SystemKeyspace.forceBlockingFlush(SystemKeyspace.LOCAL)));
+ cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1
text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled':
'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+ final int rowsPerFile = 10000;
+ cluster.forEach((node) -> node.nodetool("disableautocompaction",
KEYSPACE));
+ IMessageFilters.Filter drop1to3 =
cluster.filters().verbs(MUTATION_REQ.id).from(1).to(3).drop();
+
+ for (int k = 0; k < 3; k++)
+ {
+ for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile;
++i)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf
(k, c1, c2) VALUES (?, 'value1', 'value2');"),
+ ConsistencyLevel.ONE,
+ Integer.toString(i));
+ }
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).flush(KEYSPACE);
+ }
+
+ drop1to3.off();
+
+ ExecutorService nodetoolExecutor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+ checkThatNoStreamingOccuredBetweenTheThreeNodes(cluster);
+ Future<Integer> streamingOperationExecution =
nodetoolExecutor.submit(streamingOperation);
+ Thread.sleep(500); // Wait for some streaming to happen.
Review Comment:
If we could find a way to get a notification instead of sleep that would be
nice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]