HDFS-10794. [SPS]: Provide storage policy satisfy worker at DN for 
co-ordinating the block storage movement work. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8aa5e5ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8aa5e5ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8aa5e5ac

Branch: refs/heads/HDFS-10285
Commit: 8aa5e5ac1ab380298ed0f2d3aabafd3020bae530
Parents: f5d9235
Author: Kai Zheng <kai.zh...@intel.com>
Authored: Wed Sep 14 17:02:11 2016 +0800
Committer: Uma Maheswara Rao G <uma.ganguma...@intel.com>
Committed: Mon Oct 17 14:47:39 2016 -0700

----------------------------------------------------------------------
 .../datanode/StoragePolicySatisfyWorker.java    | 258 +++++++++++++++++++
 .../protocol/BlockStorageMovementCommand.java   | 101 ++++++++
 .../TestStoragePolicySatisfyWorker.java         | 159 ++++++++++++
 3 files changed, 518 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8aa5e5ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
new file mode 100644
index 0000000..6df4e81
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StoragePolicySatisfyWorker handles the storage policy satisfier commands.
+ * These commands would be issued from NameNode as part of Datanode's heart 
beat
+ * response. BPOfferService delegates the work to this class for handling
+ * BlockStorageMovement commands.
+ */
+@InterfaceAudience.Private
+public class StoragePolicySatisfyWorker {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StoragePolicySatisfyWorker.class);
+
+  private final DataNode datanode;
+  private final int ioFileBufferSize;
+
+  private final int moverThreads;
+  private final ExecutorService moveExecutor;
+  private final CompletionService<Void> moverExecutorCompletionService;
+  private final List<Future<Void>> moverTaskFutures;
+
+  public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
+    this.datanode = datanode;
+    this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
+
+    moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
+        DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+    moveExecutor = initializeBlockMoverThreadPool(moverThreads);
+    moverExecutorCompletionService = new ExecutorCompletionService<>(
+        moveExecutor);
+    moverTaskFutures = new ArrayList<>();
+    // TODO: Needs to manage the number of concurrent moves per DataNode.
+  }
+
+  private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
+    LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
+
+    ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
+        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new Daemon.DaemonFactory() {
+          private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
+            return t;
+          }
+        }, new ThreadPoolExecutor.CallerRunsPolicy() {
+          @Override
+          public void rejectedExecution(Runnable runnable,
+              ThreadPoolExecutor e) {
+            LOG.info("Execution for block movement to satisfy storage policy"
+                + " got rejected, Executing in current thread");
+            // will run in the current thread.
+            super.rejectedExecution(runnable, e);
+          }
+        });
+
+    moverThreadPool.allowCoreThreadTimeOut(true);
+    return moverThreadPool;
+  }
+
+  public void processBlockMovingTasks(long trackID,
+      List<BlockMovingInfo> blockMovingInfos) {
+    Future<Void> moveCallable = null;
+    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+      assert blkMovingInfo
+          .getSources().length == blkMovingInfo.getTargets().length;
+
+      for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
+        BlockMovingTask blockMovingTask =
+            new BlockMovingTask(blkMovingInfo.getBlock(),
+            blkMovingInfo.getSources()[i],
+            blkMovingInfo.getTargets()[i],
+            blkMovingInfo.getTargetStorageTypes()[i]);
+        moveCallable = moverExecutorCompletionService
+            .submit(blockMovingTask);
+        moverTaskFutures.add(moveCallable);
+      }
+    }
+
+    // TODO: Presently this function act as a blocking call, this has to be
+    // refined by moving the tracking logic to another tracker thread.
+    for (int i = 0; i < moverTaskFutures.size(); i++) {
+      try {
+        moveCallable = moverExecutorCompletionService.take();
+        moveCallable.get();
+      } catch (InterruptedException | ExecutionException e) {
+        // TODO: Failure retries and report back the error to NameNode.
+        LOG.error("Exception while moving block replica to target storage 
type",
+            e);
+      }
+    }
+  }
+
+  /**
+   * This class encapsulates the process of moving the block replica to the
+   * given target.
+   */
+  private class BlockMovingTask implements Callable<Void> {
+    private final ExtendedBlock block;
+    private final DatanodeInfo source;
+    private final DatanodeInfo target;
+    private final StorageType targetStorageType;
+
+    BlockMovingTask(ExtendedBlock block, DatanodeInfo source,
+        DatanodeInfo target, StorageType targetStorageType) {
+      this.block = block;
+      this.source = source;
+      this.target = target;
+      this.targetStorageType = targetStorageType;
+    }
+
+    @Override
+    public Void call() {
+      moveBlock();
+      return null;
+    }
+
+    private void moveBlock() {
+      LOG.info("Start moving block {}", block);
+
+      LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy "
+          + "storageType:{}", block, source, target, targetStorageType);
+      Socket sock = null;
+      DataOutputStream out = null;
+      DataInputStream in = null;
+      try {
+        DNConf dnConf = datanode.getDnConf();
+        String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
+        sock = datanode.newSocket();
+        NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
+            dnConf.getSocketTimeout());
+        sock.setSoTimeout(2 * dnConf.getSocketTimeout());
+        LOG.debug("Connecting to datanode {}", dnAddr);
+
+        OutputStream unbufOut = sock.getOutputStream();
+        InputStream unbufIn = sock.getInputStream();
+
+        Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
+            block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+
+        DataEncryptionKeyFactory keyFactory = datanode
+            .getDataEncryptionKeyFactoryForBlock(block);
+        IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
+            unbufOut, unbufIn, keyFactory, accessToken, target);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
+        out = new DataOutputStream(
+            new BufferedOutputStream(unbufOut, ioFileBufferSize));
+        in = new DataInputStream(
+            new BufferedInputStream(unbufIn, ioFileBufferSize));
+        sendRequest(out, block, accessToken, source, targetStorageType);
+        receiveResponse(in);
+
+        LOG.debug(
+            "Successfully moved block:{} from src:{} to destin:{} for"
+                + " satisfying storageType:{}",
+            block, source, target, targetStorageType);
+      } catch (IOException e) {
+        // TODO: handle failure retries
+        LOG.warn(
+            "Failed to move block:{} from src:{} to destin:{} to satisfy "
+                + "storageType:{}",
+            block, source, target, targetStorageType, e);
+      } finally {
+        IOUtils.closeStream(out);
+        IOUtils.closeStream(in);
+        IOUtils.closeSocket(sock);
+      }
+    }
+
+    /** Send a reportedBlock replace request to the output stream. */
+    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+        Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
+        StorageType destinStorageType) throws IOException {
+      new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
+          srcDn.getDatanodeUuid(), srcDn);
+    }
+
+    /** Receive a reportedBlock copy response from the input stream. */
+    private void receiveResponse(DataInputStream in) throws IOException {
+      BlockOpResponseProto response = BlockOpResponseProto
+          .parseFrom(vintPrefixed(in));
+      while (response.getStatus() == Status.IN_PROGRESS) {
+        // read intermediate responses
+        response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+      }
+      String logInfo = "reportedBlock move is failed";
+      DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8aa5e5ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
new file mode 100644
index 0000000..42ba265
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * A BlockStorageMovementCommand is an instruction to a DataNode to move the
+ * given set of blocks to specified target DataNodes to fulfill the block
+ * storage policy.
+ *
+ * Upon receiving this command, this DataNode coordinates all the block 
movement
+ * by passing the details to
+ * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
+ * service. After the block movement this DataNode sends response back to the
+ * NameNode about the movement status.
+ */
+public class BlockStorageMovementCommand extends DatanodeCommand {
+
+  // TODO: constructor needs to be refined based on the block movement data
+  // structure.
+  BlockStorageMovementCommand(int action) {
+    super(action);
+  }
+
+  /**
+   * Stores block to storage info that can be used for block movement.
+   */
+  public static class BlockMovingInfo {
+    private ExtendedBlock blk;
+    private DatanodeInfo[] sourceNodes;
+    private StorageType[] sourceStorageTypes;
+    private DatanodeInfo[] targetNodes;
+    private StorageType[] targetStorageTypes;
+
+    public BlockMovingInfo(ExtendedBlock block,
+        DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
+        StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
+      this.blk = block;
+      this.sourceNodes = sourceDnInfos;
+      this.targetNodes = targetDnInfos;
+      this.sourceStorageTypes = srcStorageTypes;
+      this.targetStorageTypes = targetStorageTypes;
+    }
+
+    public void addBlock(ExtendedBlock block) {
+      this.blk = block;
+    }
+
+    public ExtendedBlock getBlock() {
+      return this.blk;
+    }
+
+    public DatanodeInfo[] getSources() {
+      return sourceNodes;
+    }
+
+    public DatanodeInfo[] getTargets() {
+      return targetNodes;
+    }
+
+    public StorageType[] getTargetStorageTypes() {
+      return targetStorageTypes;
+    }
+
+    public StorageType[] getSourceStorageTypes() {
+      return sourceStorageTypes;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append("BlockMovingInfo(\n  ")
+          .append("Moving block: ").append(blk).append(" From: ")
+          .append(Arrays.asList(sourceNodes)).append(" To: [")
+          .append(Arrays.asList(targetNodes)).append(")\n")
+          .append(" sourceStorageTypes: ")
+          .append(Arrays.toString(sourceStorageTypes))
+          .append(" targetStorageTypes: ")
+          .append(Arrays.toString(targetStorageTypes)).toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8aa5e5ac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
new file mode 100644
index 0000000..692847d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+
+/**
+ * This class tests the behavior of moving block replica to the given storage
+ * type to fulfill the storage policy requirement.
+ */
+public class TestStoragePolicySatisfyWorker {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestStoragePolicySatisfyWorker.class);
+
+  private static final int DEFAULT_BLOCK_SIZE = 100;
+
+  private static void initConf(Configuration conf) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+  }
+
+  /**
+   * Tests to verify that the block replica is moving to ARCHIVE storage type 
to
+   * fulfill the storage policy requirement.
+   */
+  @Test(timeout = 120000)
+  public void testMoveSingleBlockToAnotherDatanode() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(4)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.DISK, StorageType.ARCHIVE}})
+        .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final String file = "/testMoveSingleBlockToAnotherDatanode";
+      // write to DISK
+      final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
+      out.writeChars("testMoveSingleBlockToAnotherDatanode");
+      out.close();
+
+      // verify before movement
+      LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      StorageType[] storageTypes = lb.getStorageTypes();
+      for (StorageType storageType : storageTypes) {
+        Assert.assertTrue(StorageType.DISK == storageType);
+      }
+      // move to ARCHIVE
+      dfs.setStoragePolicy(new Path(file), "COLD");
+
+      lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      DataNode src = cluster.getDataNodes().get(3);
+      DatanodeInfo targetDnInfo = DFSTestUtil
+          .getLocalDatanodeInfo(src.getXferPort());
+
+      // TODO: Need to revisit this when NN is implemented to be able to send
+      // block moving commands.
+      StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
+          src);
+      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+      BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
+          lb.getBlock(), lb.getLocations()[0], targetDnInfo,
+          lb.getStorageTypes()[0], StorageType.ARCHIVE);
+      blockMovingInfos.add(blockMovingInfo);
+      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+      worker.processBlockMovingTasks(inode.getId(),
+          blockMovingInfos);
+      cluster.triggerHeartbeats();
+
+      // Wait till NameNode notified about the block location details
+      waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void waitForLocatedBlockWithArchiveStorageType(
+      final DistributedFileSystem dfs, final String file,
+      int expectedArchiveCount, int timeout) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LocatedBlock lb = null;
+        try {
+          lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+        } catch (IOException e) {
+          LOG.error("Exception while getting located blocks", e);
+          return false;
+        }
+
+        int archiveCount = 0;
+        for (StorageType storageType : lb.getStorageTypes()) {
+          if (StorageType.ARCHIVE == storageType) {
+            archiveCount++;
+          }
+        }
+        LOG.info("Archive replica count, expected={} and actual={}",
+            expectedArchiveCount, archiveCount);
+        return expectedArchiveCount == archiveCount;
+      }
+    }, 100, timeout);
+  }
+
+  BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block,
+      DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
+      StorageType targetStorageType) {
+    return new BlockMovingInfo(block, new DatanodeInfo[] {src},
+        new DatanodeInfo[] {destin}, new StorageType[] {storageType},
+        new StorageType[] {targetStorageType});
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to