This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe8cdf0  HDDS-2076. Read fails because the block cannot be located in 
the container (#1410)
fe8cdf0 is described below

commit fe8cdf0ab846df9c2f3f59d1d4875185633a27ea
Author: Shashikant Banerjee <shashik...@apache.org>
AuthorDate: Thu Sep 12 21:16:39 2019 +0530

    HDDS-2076. Read fails because the block cannot be located in the container 
(#1410)
    
    Signed-off-by: Nanda kumar <na...@apache.org>
---
 .../keyvalue/helpers/KeyValueContainerUtil.java    |   7 +
 .../rpc/TestContainerReplicationEndToEnd.java      | 215 +++++++++++++++++++++
 2 files changed, 222 insertions(+)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index d5487b3..c0e7d78 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -24,12 +24,15 @@ import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -193,6 +196,10 @@ public final class KeyValueContainerUtil {
       }).sum();
       kvContainerData.setBytesUsed(bytesUsed);
       kvContainerData.setKeyCount(liveKeys.size());
+      byte[] bcsId = metadata.getStore().get(DFSUtil.string2Bytes(
+          OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX));
+      Preconditions.checkNotNull(bcsId);
+      kvContainerData.updateBlockCommitSequenceId(Longs.fromByteArray(bcsId));
     }
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
new file mode 100644
index 0000000..e5a3d2f
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests delete key operation with a slow follower in the datanode
+ * pipeline.
+ */
+public class TestContainerReplicationEndToEnd {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static String volumeName;
+  private static String bucketName;
+  private static String path;
+  private static XceiverClientManager xceiverClientManager;
+  private static long containerReportInterval;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    path = GenericTestUtils
+        .getTempPath(TestContainerStateMachineFailures.class.getSimpleName());
+    File baseDir = new File(path);
+    baseDir.mkdirs();
+    containerReportInterval = 2000;
+
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL,
+        containerReportInterval, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, containerReportInterval,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL,
+        2 * containerReportInterval, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1000,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        1000, TimeUnit.SECONDS);
+    conf.setLong("hdds.scm.replication.thread.interval",
+        containerReportInterval);
+
+    conf.setQuietMode(false);
+    cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+            .build();
+    cluster.waitForClusterToBeReady();
+    cluster.getStorageContainerManager().getReplicationManager().start();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    xceiverClientManager = new XceiverClientManager(conf);
+    volumeName = "testcontainerstatemachinefailures";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * The test simulates end to end container replication.
+   */
+  @Test
+  public void testContainerReplication() throws Exception {
+    String keyName = "testContainerReplication";
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey(keyName, 0, ReplicationType.RATIS,
+                ReplicationFactor.THREE, new HashMap<>());
+    byte[] testData = "ratis".getBytes();
+    // First write and flush creates a container in the datanode
+    key.write(testData);
+    key.flush();
+
+    KeyOutputStream groupOutputStream = (KeyOutputStream) 
key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    long containerID = omKeyLocationInfo.getContainerID();
+    PipelineID pipelineID =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(new ContainerID(containerID)).getPipelineID();
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(pipelineID);
+    key.close();
+
+    cluster.getStorageContainerManager().getContainerManager()
+        .updateContainerState(new ContainerID(containerID),
+            HddsProtos.LifeCycleEvent.FINALIZE);
+    // wait for container to move to OPEN state in SCM
+    Thread.sleep(2 * containerReportInterval);
+    DatanodeDetails oldReplicaNode = pipeline.getFirstNode();
+    // now move the container to the closed on the datanode.
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
+    request.setCmdType(ContainerProtos.Type.CloseContainer);
+    request.setContainerID(containerID);
+    request.setCloseContainer(
+        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+    xceiverClient.sendCommand(request.build());
+    // wait for container to move to closed state in SCM
+    Thread.sleep(2 * containerReportInterval);
+    Assert.assertTrue(
+        cluster.getStorageContainerManager().getContainerInfo(containerID)
+            .getState() == HddsProtos.LifeCycleState.CLOSED);
+    // shutdown the replica node
+    cluster.shutdownHddsDatanode(oldReplicaNode);
+    // now the container is under replicated and will be moved to a different 
dn
+    HddsDatanodeService dnService = null;
+
+    for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+      Predicate<DatanodeDetails> p =
+          i -> i.getUuid().equals(dn.getDatanodeDetails().getUuid());
+      if (!pipeline.getNodes().stream().anyMatch(p)) {
+        dnService = dn;
+      }
+    }
+
+    Assert.assertNotNull(dnService);
+    final HddsDatanodeService newReplicaNode = dnService;
+    // wait for the container to get replicated
+    GenericTestUtils.waitFor(() -> {
+      return newReplicaNode.getDatanodeStateMachine().getContainer()
+          .getContainerSet().getContainer(containerID) != null;
+    }, 500, 100000);
+    Assert.assertTrue(newReplicaNode.getDatanodeStateMachine().getContainer()
+        .getContainerSet().getContainer(containerID).getContainerData()
+        .getBlockCommitSequenceId() > 0);
+    // wait for SCM to update the replica Map
+    Thread.sleep(5 * containerReportInterval);
+    // now shutdown the other two dns of the original pipeline and try reading
+    // the key again
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      cluster.shutdownHddsDatanode(dn);
+    }
+    // This will try to read the data from the dn to which the container got
+    // replicated after the container got closed.
+    ContainerTestHelper
+        .validateData(keyName, testData, objectStore, volumeName, bucketName);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to