yifan-c commented on code in PR #166: URL: https://github.com/apache/cassandra-sidecar/pull/166#discussion_r1927947994
########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StreamManagerJmxOperations.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.util.Set; +import javax.management.openmbean.CompositeData; + +/** + * An interface that pulls methods from the Cassandra Stream manager Proxy + */ +public interface StreamManagerJmxOperations +{ + + String STREAM_MANAGER_OBJ_NAME = "org.apache.cassandra.net:type=StreamManager"; + Set<CompositeData> getCurrentStreams(); Review Comment: add java doc. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java: ########## @@ -70,6 +88,52 @@ public ConnectedClientStatsResponse connectedClientDetails() return new ConnectedClientStatsResponse(entries, totalConnectedClients, connectionsByUser); } + /** + * {@inheritDoc} + */ + @Override + public StreamsProgressStats streamsProgressStats() + { + Set<CompositeData> streamData = jmxClient.proxy(StreamManagerJmxOperations.class, STREAM_MANAGER_OBJ_NAME) + .getCurrentStreams(); + + List<StreamState> streamStates = streamData.stream().map(StreamState::new).collect(Collectors.toList()); + return computeStats(streamStates); + } + + private StreamsProgressStats computeStats(List<StreamState> streamStates) + { + List<SessionInfo> sessions = streamStates.stream().map(s -> s.sessions()).flatMap(Collection::stream).collect(Collectors.toList()); Review Comment: Nit: Since `streamStates` is turned into stream immediately, you can avoid materialize the stream into list at line#100. Just pass the `Stream<StreamState>` to `computeStats`. Here is my suggestion to avoid materializing the lists at all. ```java return computeStats(streamData.stream().map(StreamState::new)); } private StreamsProgressStats computeStats(Stream<StreamState> streamStates) { Iterator<SessionInfo> sessions = streamStates.map(StreamState::sessions).flatMap(Collection::stream).iterator(); long totalFilesToReceive = 0; long totalFilesReceived = 0; long totalBytesToReceive = 0; long totalBytesReceived = 0; long totalFilesToSend = 0; long totalFilesSent = 0; long totalBytesToSend = 0; long totalBytesSent = 0; while (sessions.hasNext()) { SessionInfo sessionInfo = sessions.next(); totalBytesToReceive += sessionInfo.totalSizeToReceive(); totalBytesReceived += sessionInfo.totalSizeReceived(); totalFilesToReceive += sessionInfo.totalFilesToReceive(); totalFilesReceived += sessionInfo.totalFilesReceived(); totalBytesToSend += sessionInfo.totalSizeToSend(); totalBytesSent += sessionInfo.totalSizeSent(); totalFilesToSend += sessionInfo.totalFilesToSend(); totalFilesSent += sessionInfo.totalFilesSent(); } ``` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream state data + */ +public class StreamState +{ + Collection<SessionInfo> sessions; Review Comment: 1. `private final`? 2. It does not buy anything with using `Collection`. How about changing it to `List`? ########## server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java: ########## @@ -32,4 +33,9 @@ public interface MetricsOperations */ ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly); + /** + * Retrieve the stream progress stats metrics from the cluster Review Comment: nit: `stats` == `metrics`. We can remove `metrics` ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream state data + */ +public class StreamState +{ + Collection<SessionInfo> sessions; + + public StreamState(CompositeData data) + { + this.sessions = parseSessions((CompositeData[]) data.get("sessions")); + } + + /** + * @return the session info for the sessions in the stream stats data + */ + public Collection<SessionInfo> sessions() + { + return sessions; + } + + private Collection<SessionInfo> parseSessions(CompositeData[] sessions) Review Comment: Return `List` in both methods. ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java: ########## @@ -1622,6 +1624,42 @@ public void onError(Throwable throwable) assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testStreamsStats() throws Exception + { + String streamStatsResponseAsString = "{\"operationMode\":\"NORMAL\"," + + "\"streamsProgressStats\":{\"totalFilesToReceive\":7," + + "\"totalFilesReceived\":7,\"totalBytesToReceive\":15088," + + "\"totalBytesReceived\":15088,\"totalFilesToSend\":2,\"totalFilesSent\":2," + + "\"totalBytesToSend\":1024,\"totalBytesSent\":1024}}"; + + MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(streamStatsResponseAsString); + enqueue(response); + + for (MockWebServer server : servers) + { + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); + StreamStatsResponse result = client.streamsStats(sidecarInstance).get(30, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.operationMode()).isNotNull().isEqualTo("NORMAL"); + StreamsProgressStats progressStats = result.streamsProgressStats(); + assertThat(progressStats).isNotNull(); + assertThat(progressStats.totalFilesToSend()).isNotNull() + .isEqualTo(progressStats.totalFilesSent()) + .isEqualTo(2); + assertThat(progressStats.totalBytesToSend()).isNotNull() + .isEqualTo(progressStats.totalBytesSent()) + .isEqualTo(1024); + assertThat(progressStats.totalBytesToReceive()).isNotNull() + .isEqualTo(progressStats.totalBytesReceived()) + .isEqualTo(15088); + assertThat(progressStats.totalFilesToReceive()).isNotNull() + .isEqualTo(progressStats.totalFilesReceived()) + .isEqualTo(7); Review Comment: nit: create `StreamsProgressStats` object and serialize it into string, instead of declaring `streamStatsResponseAsString`. For assertion, now it can check the received `StreamsProgressStats` has the same values as the original `StreamsProgressStats` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.Set; + +import com.google.inject.Inject; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.acl.authorization.VariableAwareResource; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; +import org.apache.cassandra.sidecar.common.server.MetricsOperations; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +/** + * Handler for retrieving node streams stats + */ +public class StreamStatsHandler extends AbstractHandler<Void> implements AccessProtected +{ + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the metadata fetcher + * @param executorPools executor pools for blocking executions + */ + @Inject + protected StreamStatsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools) + { + super(metadataFetcher, executorPools, null); + } + + @Override + public Set<Authorization> requiredAuthorizations() + { + String resource = VariableAwareResource.CLUSTER.resource(); + return Collections.singleton(BasicPermissions.READ_STATS.toAuthorization(resource)); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + Void request) + { + + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + + StorageOperations storageOperations = delegate.storageOperations(); + MetricsOperations metricsOperations = delegate.metricsOperations(); Review Comment: nit: move those 2 lines into the lambda of `executeBlocking` ########## server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 5, network = true, buildCluster = false) + void streamStatsTest(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws InterruptedException + { + + BBHelperDecommissioningNode.reset(); + UpgradeableCluster cluster = cassandraTestContext.configureAndStartCluster( + builder -> builder.withInstanceInitializer(BBHelperDecommissioningNode::install)); + IUpgradeableInstance node = cluster.get(5); + IUpgradeableInstance seed = cluster.get(1); + + createTestKeyspace(); + createTestTableAndPopulate(); + + startAsync("Decommission node" + node.config().num(), + () -> node.nodetoolResult("decommission").asserts().success()); + AtomicBoolean hasStats = new AtomicBoolean(false); + AtomicBoolean dataReceived = new AtomicBoolean(false); + + // Wait until nodes have reached expected state + awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); + + ClusterUtils.awaitRingState(seed, node, "Leaving"); + BBHelperDecommissioningNode.transientStateEnd.countDown(); + + for (int i = 0; i < 20; i++) Review Comment: can we use `loopAssert` here? ########## server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java: ########## @@ -32,4 +33,9 @@ public interface MetricsOperations */ ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly); + /** + * Retrieve the stream progress stats metrics from the cluster + * @return the requested stream progress stats, in full or summary Review Comment: since `in full or summary` is mentioned, can you explain what is full and what is summary in the javadoc. ########## server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 2, network = true, buildCluster = false) + void streamStatsTest(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws Exception + { + BBHelperDecommissioningNode.reset(); + UpgradeableCluster cluster = cassandraTestContext.configureAndStartCluster( + builder -> builder.withInstanceInitializer(BBHelperDecommissioningNode::install)); + IUpgradeableInstance node = cluster.get(2); + + createTestKeyspace(); + createTestTableAndPopulate(); + + startAsync("Decommission node" + node.config().num(), + () -> node.nodetoolResult("decommission", "--force").asserts().success()); + AtomicBoolean hasStats = new AtomicBoolean(false); + AtomicBoolean dataReceived = new AtomicBoolean(false); + + // Wait until nodes have reached expected state + awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); + + // optimal no. of attempts to poll for stats to capture streaming stats during node decommissioning + for (int i = 0; i < 20; i++) + { + streamStats(hasStats, dataReceived); + if (dataReceived.get()) + { + break; + } + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + ClusterUtils.awaitGossipStatus(node, node, "LEFT"); + BBHelperDecommissioningNode.transientStateEnd.countDown(); + + assertThat(hasStats).isTrue(); + assertThat(dataReceived).isTrue(); + context.completeNow(); + context.awaitCompletion(2, TimeUnit.MINUTES); + } + + private void streamStats(AtomicBoolean hasStats, AtomicBoolean dataReceived) + { + String testRoute = "/api/v1/cassandra/stats/streams"; + HttpResponse<Buffer> resp; + try + { + resp = client.get(server.actualPort(), "127.0.0.1", testRoute) + .send() + .toCompletionStage() + .toCompletableFuture() + .get(); + logger.info("Success Status Response code: {}", resp.statusCode()); + logger.info("Status Response: {}", resp.bodyAsString()); + if (resp.statusCode() == HttpResponseStatus.OK.code()) + { + assertStreamStatsResponseOK(resp, hasStats, dataReceived); + } + + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + } + + void assertStreamStatsResponseOK(HttpResponse<Buffer> response, AtomicBoolean hasStats, AtomicBoolean dataReceived) + { + StreamStatsResponse streamStatsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(streamStatsResponse).isNotNull(); + StreamsProgressStats streamProgress = streamStatsResponse.streamsProgressStats(); + assertThat(streamProgress).isNotNull(); + if (streamProgress.totalFilesToReceive() > 0) + { + hasStats.set(true); + if (streamProgress.totalFilesToReceive() == streamProgress.totalFilesReceived() && + streamProgress.totalFilesReceived() > 0) + { + dataReceived.set(true); + assertThat(streamProgress.totalBytesToReceive()).isEqualTo(streamProgress.totalBytesReceived()); + assertThat(streamProgress.totalBytesReceived()).isGreaterThan(0); + } + } + } + + QualifiedTableName createTestTableAndPopulate() + { + QualifiedTableName tableName = createTestTable( + "CREATE TABLE %s ( \n" + + " race_year int, \n" + + " race_name text, \n" + + " cyclist_name text, \n" + + " rank int, \n" + + " PRIMARY KEY ((race_year, race_name), rank) \n" + + ");"); + Session session = maybeGetSession(); + + session.execute("CREATE INDEX ryear ON " + tableName + " (race_year);"); + + for (int i = 1; i <= 1000; i++) + { + session.execute("INSERT INTO " + tableName + " (race_year, race_name, rank, cyclist_name) " + + "VALUES (2015, 'Tour of Japan - Stage 4 - Minami > Shinshu', " + i + ", 'Benjamin PRADES');"); + } + return tableName; + } + + /** + * ByteBuddy Helper for decommissioning node + */ + public static class BBHelperDecommissioningNode + { + static CountDownLatch transientStateStart = new CountDownLatch(1); + static CountDownLatch transientStateEnd = new CountDownLatch(1); + + public static void install(ClassLoader cl, Integer nodeNumber) + { + if (nodeNumber == 2) + { + TypePool typePool = TypePool.Default.of(cl); + TypeDescription description = typePool.describe("org.apache.cassandra.streaming.StreamCoordinator") + .resolve(); + new ByteBuddy().rebase(description, ClassFileLocator.ForClassLoader.of(cl)) + .method(named("connectAllStreamSessions")) +// .method(named("onInitializationComplete")) +// .method(named("start")) Review Comment: remove them? ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamStatsRequest.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import com.fasterxml.jackson.annotation.JsonInclude; +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; + +/** + * Class response for the StreamsStats API + */ +@JsonInclude(JsonInclude.Include.NON_NULL) Review Comment: This is not json pojo. The annotation can be removed. ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java: ########## @@ -129,6 +129,7 @@ public final class ApiEndpointsV1 public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission"; + public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; Review Comment: should `streams` be `stream` instead? why it is in plural form? I am reading it as getting the `stream (stat)` from the `stats` (in plural) resource. ########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import javax.management.openmbean.CompositeData; + +/** + * Representation of the stream progress info + */ +public class ProgressInfo +{ + public final String peer; + public final int sessionIndex; + public final String fileName; + public final String direction; + public final long currentBytes; + public final long totalBytes; + + public ProgressInfo(CompositeData data) + { + this.peer = (String) data.get("peer"); + this.sessionIndex = (int) data.get("sessionIndex"); + this.fileName = (String) data.get("fileName"); + this.direction = (String) data.get("direction"); + this.currentBytes = (long) data.get("currentBytes"); + this.totalBytes = (long) data.get("totalBytes"); Review Comment: How about having this utility method to provide richer info when value is absent or type mismatches. ```java public static <T> T extractValue(CompositeData data, String key) { Object value = data.get(key); if (value == null) { throw new NoSuchElementException("No value is present for key: " + key); } try { return (T) value; } catch (ClassCastException cce) { throw new RuntimeException("Value type mismatched of key: " + key, cce); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

