HDFS-9595. DiskBalancer: Add cancelPlan RPC. (Contributed by Anu Engineer)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0501d430 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0501d430 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0501d430 Branch: refs/heads/HDFS-1312 Commit: 0501d430e2f6111ad8b65dc36f4a98d94cb9589b Parents: 7100c0d Author: Arpit Agarwal <a...@apache.org> Authored: Fri Jan 15 16:08:49 2016 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Thu Jun 23 18:18:48 2016 -0700 ---------------------------------------------------------------------- .../hdfs/protocol/ClientDatanodeProtocol.java | 7 +++ .../ClientDatanodeProtocolTranslatorPB.java | 19 +++++++ .../src/main/proto/ClientDatanodeProtocol.proto | 19 +++++++ .../hadoop-hdfs/HDFS-1312_CHANGES.txt | 3 ++ ...tDatanodeProtocolServerSideTranslatorPB.java | 22 ++++++++ .../hadoop/hdfs/server/datanode/DataNode.java | 7 +++ .../diskbalancer/planner/GreedyPlanner.java | 4 ++ .../diskbalancer/TestDiskBalancerRPC.java | 56 ++++++++++++++++---- 8 files changed, 127 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 6e9cef0..125a3c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -170,4 +170,11 @@ public interface ClientDatanodeProtocol { void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth, String plan) throws IOException; + /** + * Cancel an executing plan. + * + * @param planID - A SHA512 hash of the plan string. + */ + void cancelDiskBalancePlan(String planID) throws IOException; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index da8d962..e037fcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Shutdo import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanRequestProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -358,4 +359,22 @@ public class ClientDatanodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + /** + * Cancels an executing disk balancer plan. + * @param planID - A SHA512 hash of the plan string. + * + * @throws IOException on error + */ + @Override + public void cancelDiskBalancePlan(String planID) + throws IOException { + try { + CancelPlanRequestProto request = CancelPlanRequestProto.newBuilder() + .setPlanID(planID).build(); + rpcProxy.cancelDiskBalancerPlan(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index d11979b..b65766b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -167,6 +167,20 @@ message SubmitDiskBalancerPlanResponseProto { } /** + * This message describes a request to cancel an + * outstanding disk balancer plan + */ +message CancelPlanRequestProto { + required string planID = 1; +} + +/** + * This is the response for the cancellation request + */ +message CancelPlanResponseProto { +} + +/** * Protocol used from client to the Datanode. * See the request and response for details of rpc call. */ @@ -230,4 +244,9 @@ service ClientDatanodeProtocolService { */ rpc submitDiskBalancerPlan(SubmitDiskBalancerPlanRequestProto) returns (SubmitDiskBalancerPlanResponseProto); + /** + * Cancel an executing plan + */ + rpc cancelDiskBalancerPlan(CancelPlanRequestProto) + returns (CancelPlanResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt index 6d8cde0..8ceb45b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -16,3 +16,6 @@ HDFS-1312 Change Log HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via Arpit Agarwal) + HDFS-9595. DiskBalancer: Add cancelPlan RPC. (Anu Engineer via + Arpit Agarwal) + http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 824f050..27fe803 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Trigge import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanResponseProto; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -259,4 +261,24 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } + + /** + * Cancel an executing plan. + * @param controller - RpcController + * @param request - Request + * @return Response. + * @throws ServiceException + */ + @Override + public CancelPlanResponseProto cancelDiskBalancerPlan( + RpcController controller, CancelPlanRequestProto request) + throws ServiceException { + try { + impl.cancelDiskBalancePlan(request.getPlanID()); + return CancelPlanResponseProto.newBuilder().build(); + }catch (Exception e) { + throw new ServiceException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index e06555f..b2d9994 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3310,4 +3310,11 @@ public class DataNode extends ReconfigurableBase throw new DiskbalancerException("Not Implemented", 0); } + @Override + public void cancelDiskBalancePlan(String planID) throws + IOException { + checkSuperuserPrivilege(); + throw new DiskbalancerException("Not Implemented", 0); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java index 43f9953..f0fc776 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdfs.server.diskbalancer.planner; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel @@ -90,6 +91,9 @@ public class GreedyPlanner implements Planner { public void balanceVolumeSet(DiskBalancerDataNode node, DiskBalancerVolumeSet vSet, NodePlan plan) throws Exception { + Preconditions.checkNotNull(vSet); + Preconditions.checkNotNull(plan); + Preconditions.checkNotNull(node); DiskBalancerVolumeSet currentSet = new DiskBalancerVolumeSet(vSet); while (currentSet.isBalancingNeeded(this.threshold)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0501d430/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index e047d5a..35d3f91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -42,10 +42,10 @@ public class TestDiskBalancerRPC { public ExpectedException thrown = ExpectedException.none(); private MiniDFSCluster cluster; - + private Configuration conf; @Before public void setUp() throws Exception { - Configuration conf = new HdfsConfiguration(); + conf = new HdfsConfiguration(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); } @@ -59,21 +59,54 @@ public class TestDiskBalancerRPC { @Test public void TestSubmitTestRpc() throws Exception { - URI clusterJson = getClass() - .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI(); - ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson, - null); - DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(jsonConnector); + final int dnIndex = 0; + cluster.restartDataNode(dnIndex); + cluster.waitActive(); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + + DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector); diskBalancerCluster.readClusterInfo(); - Assert.assertEquals(3, diskBalancerCluster.getNodes().size()); + Assert.assertEquals(cluster.getDataNodes().size(), + diskBalancerCluster.getNodes().size()); diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0); + DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(dnIndex); GreedyPlanner planner = new GreedyPlanner(10.0f, node); NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort ()); - planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan); + planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); + final int planVersion = 0; // So far we support only one version. + DataNode dataNode = cluster.getDataNodes().get(dnIndex); + + String planHash = DigestUtils.sha512Hex(plan.toJson()); + + // Since submitDiskBalancerPlan is not implemented yet, it throws an + // Exception, this will be modified with the actual implementation. + thrown.expect(DiskbalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + + } + + @Test + public void TestCancelTestRpc() throws Exception { final int dnIndex = 0; + cluster.restartDataNode(dnIndex); + cluster.waitActive(); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + + DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector); + diskBalancerCluster.readClusterInfo(); + Assert.assertEquals(cluster.getDataNodes().size(), + diskBalancerCluster.getNodes().size()); + diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); + DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0); + GreedyPlanner planner = new GreedyPlanner(10.0f, node); + NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort + ()); + planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); + final int planVersion = 0; // So far we support only one version. DataNode dataNode = cluster.getDataNodes().get(dnIndex); String planHash = DigestUtils.sha512Hex(plan.toJson()); @@ -83,5 +116,8 @@ public class TestDiskBalancerRPC { thrown.expect(DiskbalancerException.class); dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + thrown.expect(DiskbalancerException.class); + dataNode.cancelDiskBalancePlan(planHash); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org