sarankk commented on code in PR #275:
URL: https://github.com/apache/cassandra-sidecar/pull/275#discussion_r2641757904
##########
client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java:
##########
@@ -610,6 +611,18 @@ public Builder nodeDrainRequest()
return request(NODE_DRAIN_REQUEST);
}
+ /**
+ * Sets the {@code request} to be a {@link NodeMoveRequest} and
returns a reference to this Builder
+ * enabling method chaining.
+ *
+ * @param newToken the new token for the node to move to
+ * @return a reference to this Builder
+ */
+ public Builder nodeMoveRequest(String newToken)
Review Comment:
Nit: `@NotNull` annotation for newToken
##########
server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandlerTest.java:
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeMoveHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeMoveHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(NodeMoveHandlerTest.class);
+ public static final String MOVE_ROUTE =
"/api/v1/cassandra/operations/move";
+ public static final String LOCAL_HOST = "127.0.0.1";
+ public static final String OPERATION_MODE_MOVING = "MOVING";
+ public static final String OPERATION_MODE_NORMAL = "NORMAL";
+ Vertx vertx;
+ Server server;
+ StorageOperations mockStorageOperations = mock(StorageOperations.class);
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ Module testOverride = Modules.override(new TestModule())
+ .with(new
NodeMoveHandlerTest.NodeMoveTestModule());
+ injector = Guice.createInjector(Modules.override(SidecarModules.all())
+ .with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testMoveLongRunning(VertxTestContext context) throws IOException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null))
+ .when(mockStorageOperations).move(anyString());
+
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"123456789\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_ACCEPTED)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(RUNNING);
+ assertThat(moveResponse.operation()).isEqualTo("move");
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveCompleted(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"123456789\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LOGGER.info("Move Response: {}", response.bodyAsString());
+
+ OperationalJobResponse moveResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(moveResponse).isNotNull();
+ assertThat(moveResponse.status()).isEqualTo(SUCCEEDED);
+ assertThat(moveResponse.operation()).isEqualTo("move");
+ try
+ {
+ verify(mockStorageOperations).move("123456789");
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testMoveFailed(VertxTestContext context) throws IOException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ doThrow(new RuntimeException("Simulated
failure")).when(mockStorageOperations).move(anyString());
+ WebClient client = WebClient.create(vertx);
+ String requestBody = "{\"newToken\":\"123456789\"}";
+ client.put(server.actualPort(), LOCAL_HOST, MOVE_ROUTE)
+ .expect(ResponsePredicate.SC_OK)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(io.vertx.core.buffer.Buffer.buffer(requestBody),
context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
Review Comment:
Nit: verify `"jobStatus":"FAILED", "reason":"Simulated failure"`
##########
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java:
##########
@@ -88,17 +100,160 @@ void testNodeDrainOperationSuccess()
// Validate the operational job status using the OperationalJobHandler
String jobId = responseBody.getString("jobId");
- validateOperationalJobStatus(jobId, "drain");
+ validateOperationalJobStatus(jobId, "drain",
OperationalJobStatus.SUCCEEDED);
+ }
+
+
+ @Test
+ void testNodeMoveOperationSuccess()
+ {
+ // Use a test token - this is a valid token for Murmur3Partitioner
+ String testToken = "123456789";
+ String requestBody = "{\"newToken\":\"" + testToken + "\"}";
+
+ // Validate that the node owns a different token than testToken
+ String currentToken = getCurrentTokenForNode("localhost");
+ assertThat(currentToken).isNotEqualTo(testToken);
+
+ // Initiate move operation
+ HttpResponse<Buffer> moveResponse = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.NODE_MOVE_ROUTE)
+ .putHeader("content-type", "application/json")
+ .sendBuffer(Buffer.buffer(requestBody)));
+
+ assertThat(moveResponse.statusCode()).isIn(OK.code(), ACCEPTED.code());
+
+ JsonObject responseBody = moveResponse.bodyAsJsonObject();
+ assertThat(responseBody).isNotNull();
+ assertThat(responseBody.getString("jobId")).isNotNull();
+ assertThat(responseBody.getString("operation")).isEqualTo("move");
+ assertThat(responseBody.getString("jobStatus")).isIn(
+ OperationalJobStatus.CREATED.name(),
+ OperationalJobStatus.RUNNING.name(),
+ OperationalJobStatus.SUCCEEDED.name()
+ );
+
+ // Verify the job eventually completes (or at least gets processed)
+ loopAssert(30, 500, () -> {
+ HttpResponse<Buffer> streamStatsResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.STREAM_STATS_ROUTE)
+ .send());
+
+ assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code());
+
+ JsonObject streamStats = streamStatsResponse.bodyAsJsonObject();
+ assertThat(streamStats).isNotNull();
+ // The operationMode should be either NORMAL (completed) or MOVING
(in progress)
+ assertThat(streamStats.getString("operationMode")).isIn("NORMAL",
"MOVING");
+ });
+
+ // Validate the operational job status using the OperationalJobHandler
+ String jobId = responseBody.getString("jobId");
+ validateOperationalJobStatus(jobId, "move",
OperationalJobStatus.SUCCEEDED);
Review Comment:
Clarification question, does storageOps.move call succeed even if the node
is in moving status?
##########
client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java:
##########
@@ -1468,6 +1468,31 @@ public void testNodeDrain() throws Exception
validateResponseServed(ApiEndpointsV1.NODE_DRAIN_ROUTE);
}
+ @Test
+ public void testNodeMove() throws Exception
+ {
+ UUID jobId = UUID.randomUUID();
+ String newToken = "123456789";
+ String nodeMoveString = "{\"jobId\":\"" + jobId +
"\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}";
Review Comment:
Nit: Why do we return `instance` in response here? `OperationalJobResponse`
seems to return only these 4 fields jobId, jobStatus, operation, reason
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]