sklaha commented on code in PR #275: URL: https://github.com/apache/cassandra-sidecar/pull/275#discussion_r2619913283
########## server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.Collections; +import java.util.Set; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils; +import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.utils.StringUtils; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.job.NodeMoveJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Provides REST API for asynchronously moving the corresponding Cassandra node to a new token + */ +public class NodeMoveHandler extends AbstractHandler<String> implements AccessProtected +{ + private final OperationalJobManager jobManager; + private final ServiceConfiguration config; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected NodeMoveHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + ServiceConfiguration serviceConfiguration, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + this.config = serviceConfiguration; + } + + @Override + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.MOVE_NODE.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + String newToken) + { + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); + NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken, operations); + this.jobManager.trySubmitJob(job, + (completedJob, exception) -> + OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), + executorPools.service(), + config.operationalJobExecutionMaxWaitTime()); + } + + /** + * {@inheritDoc} + */ + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + String body = context.body().asString(); + if (body == null || body.equalsIgnoreCase("null")) + { + logger.warn("Bad request. Received null payload."); Review Comment: done ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeMoveRequestPayload.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Request payload for node move operations. + * + * <p>Valid JSON:</p> + * <pre> + * { "newToken": "123456789" } + * </pre> + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class NodeMoveRequestPayload +{ + private final String newToken; + + /** + * @param newToken the new token for the node to move to + */ + @JsonCreator + public NodeMoveRequestPayload(@JsonProperty(value = "newToken", required = true) String newToken) + { + this.newToken = newToken; Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.Collections; +import java.util.Set; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils; +import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.utils.StringUtils; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.job.NodeMoveJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Provides REST API for asynchronously moving the corresponding Cassandra node to a new token + */ +public class NodeMoveHandler extends AbstractHandler<String> implements AccessProtected +{ + private final OperationalJobManager jobManager; + private final ServiceConfiguration config; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected NodeMoveHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + ServiceConfiguration serviceConfiguration, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + this.config = serviceConfiguration; + } + + @Override + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.MOVE_NODE.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + String newToken) + { + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); + NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken, operations); + this.jobManager.trySubmitJob(job, + (completedJob, exception) -> + OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), + executorPools.service(), + config.operationalJobExecutionMaxWaitTime()); + } + + /** + * {@inheritDoc} + */ + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + String body = context.body().asString(); + if (body == null || body.equalsIgnoreCase("null")) + { + logger.warn("Bad request. Received null payload."); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Request body must be JSON with a non-null \"newToken\" field"); + } + try + { + NodeMoveRequestPayload payload = Json.decodeValue(body, NodeMoveRequestPayload.class); + String newToken = payload.newToken(); + if (StringUtils.isNullOrEmpty(newToken) || !DataTypeUtils.isValidBigInt(newToken)) Review Comment: I think, OrderPreservingPartitioner is deprecated and token is numeric for all other partitioners. Please correct me if I am wrong. ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeMoveHandler.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.Collections; +import java.util.Set; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.adapters.base.utils.DataTypeUtils; +import org.apache.cassandra.sidecar.common.request.data.NodeMoveRequestPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.utils.StringUtils; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.job.NodeMoveJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Provides REST API for asynchronously moving the corresponding Cassandra node to a new token + */ +public class NodeMoveHandler extends AbstractHandler<String> implements AccessProtected +{ + private final OperationalJobManager jobManager; + private final ServiceConfiguration config; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected NodeMoveHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + ServiceConfiguration serviceConfiguration, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + this.config = serviceConfiguration; + } + + @Override + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.MOVE_NODE.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + String newToken) + { + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); + NodeMoveJob job = new NodeMoveJob(UUIDs.timeBased(), newToken, operations); + this.jobManager.trySubmitJob(job, + (completedJob, exception) -> + OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception), + executorPools.service(), + config.operationalJobExecutionMaxWaitTime()); + } + + /** + * {@inheritDoc} + */ + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + String body = context.body().asString(); + if (body == null || body.equalsIgnoreCase("null")) + { + logger.warn("Bad request. Received null payload."); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Request body must be JSON with a non-null \"newToken\" field"); + } + try + { + NodeMoveRequestPayload payload = Json.decodeValue(body, NodeMoveRequestPayload.class); + String newToken = payload.newToken(); + if (StringUtils.isNullOrEmpty(newToken) || !DataTypeUtils.isValidBigInt(newToken)) + { + throw new IllegalArgumentException( + String.format("newToken value must be a valid number. Provided value=%s", newToken)); + } + return newToken.trim(); + } + catch (DecodeException e) + { + logger.warn("Bad request. Received invalid JSON payload."); Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/job/NodeMoveJob.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.server.StorageOperations; + +/** + * Implementation of {@link OperationalJob} to perform node move operation. + */ +public class NodeMoveJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NodeMoveJob.class); + private static final String OPERATION = "move"; + private static final String OPERATION_MODE_MOVING = "MOVING"; Review Comment: We just need to check if a move operation is in progress or not. Check if the status is "MOVING" accomplishes that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

