This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c354195  HDDS-1016. Allow marking containers as unhealthy. Contributed 
by Arpit Agarwal.
c354195 is described below

commit c35419579b5c5b315c5b62d8b89149924416b480
Author: Arpit Agarwal <a...@apache.org>
AuthorDate: Wed Jan 30 11:40:50 2019 -0800

    HDDS-1016. Allow marking containers as unhealthy. Contributed by Arpit 
Agarwal.
---
 .../container/common/interfaces/Container.java     |   5 +
 .../container/keyvalue/KeyValueContainer.java      |  60 +++++-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  94 ++++++++-
 .../TestKeyValueContainerMarkUnhealthy.java        | 172 ++++++++++++++++
 .../TestKeyValueHandlerWithUnhealthyContainer.java | 227 +++++++++++++++++++++
 5 files changed, 538 insertions(+), 20 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 405cac3..58e3383 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -85,6 +85,11 @@ public interface Container<CONTAINERDATA extends 
ContainerData> extends RwLock {
   void markContainerForClose() throws StorageContainerException;
 
   /**
+   * Marks the container replica as unhealthy.
+   */
+  void markContainerUnhealthy() throws StorageContainerException;
+
+  /**
    * Quasi Closes a open container, if it is already closed or does not exist a
    * StorageContainerException is thrown.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index e737a53..ba559e9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -64,6 +64,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_FILES_CREATE_ERROR;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_INTERNAL_ERROR;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_OPEN;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.DISK_OUT_OF_SPACE;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -72,6 +73,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.INVALID_CONTAINER_STATE;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.UNSUPPORTED_REQUEST;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,8 +111,8 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
 
     File containerMetaDataPath = null;
     //acquiring volumeset read lock
-    volumeSet.readLock();
     long maxSize = containerData.getMaxSize();
+    volumeSet.readLock();
     try {
       HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
           .getVolumesList(), maxSize);
@@ -270,28 +272,67 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
 
   @Override
   public void markContainerForClose() throws StorageContainerException {
-    updateContainerData(() ->
-        containerData.setState(ContainerDataProto.State.CLOSING));
+    writeLock();
+    try {
+      if (getContainerState() != ContainerDataProto.State.OPEN) {
+        throw new StorageContainerException(
+            "Attempting to close a " + getContainerState() + " container.",
+            CONTAINER_NOT_OPEN);
+      }
+      updateContainerData(() ->
+          containerData.setState(ContainerDataProto.State.CLOSING));
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  @Override
+  public void markContainerUnhealthy() throws StorageContainerException {
+    writeLock();
+    try {
+      updateContainerData(() ->
+          containerData.setState(ContainerDataProto.State.UNHEALTHY));
+    } finally {
+      writeUnlock();
+    }
   }
 
   @Override
   public void quasiClose() throws StorageContainerException {
-    updateContainerData(containerData::quasiCloseContainer);
+    writeLock();
+    try {
+      updateContainerData(containerData::quasiCloseContainer);
+    } finally {
+      writeUnlock();
+    }
   }
 
   @Override
   public void close() throws StorageContainerException {
-    updateContainerData(containerData::closeContainer);
+    writeLock();
+    try {
+      updateContainerData(containerData::closeContainer);
+    } finally {
+      writeUnlock();
+    }
+
     // It is ok if this operation takes a bit of time.
     // Close container is not expected to be instantaneous.
     compactDB();
   }
 
+  /**
+   *
+   * Must be invoked with the writeLock held.
+   *
+   * @param update
+   * @throws StorageContainerException
+   */
   private void updateContainerData(Runnable update)
       throws StorageContainerException {
+    Preconditions.checkState(hasWriteLock());
     ContainerDataProto.State oldState = null;
     try {
-      writeLock();
       oldState = containerData.getState();
       update.run();
       File containerFile = getContainerFile();
@@ -304,12 +345,10 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
         containerData.setState(oldState);
       }
       throw ex;
-    } finally {
-      writeUnlock();
     }
   }
 
-  private void compactDB() throws StorageContainerException {
+  void compactDB() throws StorageContainerException {
     try {
       MetadataStore db = BlockUtils.getDB(containerData, config);
       db.compactDB();
@@ -340,7 +379,8 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   }
 
   @Override
-  public void update(Map<String, String> metadata, boolean forceUpdate)
+  public void update(
+      Map<String, String> metadata, boolean forceUpdate)
       throws StorageContainerException {
 
     // TODO: Now, when writing the updated data to .container file, we are
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 261dbc4..3748966 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -111,7 +111,9 @@ public class KeyValueHandler extends Handler {
   private final BlockDeletingService blockDeletingService;
   private final VolumeChoosingPolicy volumeChoosingPolicy;
   private final long maxContainerSize;
-  private final AutoCloseableLock handlerLock;
+
+  // A lock that is held during container creation.
+  private final AutoCloseableLock containerCreationLock;
   private final boolean doSyncWrite;
 
   public KeyValueHandler(Configuration config, StateContext context,
@@ -143,7 +145,7 @@ public class KeyValueHandler extends Handler {
             ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
     // this handler lock is used for synchronizing createContainer Requests,
     // so using a fair lock here.
-    handlerLock = new AutoCloseableLock(new ReentrantLock(true));
+    containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
   }
 
   @VisibleForTesting
@@ -212,7 +214,7 @@ public class KeyValueHandler extends Handler {
 
   /**
    * Handles Create Container Request. If successful, adds the container to
-   * ContainerSet.
+   * ContainerSet and sends an ICR to the SCM.
    */
   ContainerCommandResponseProto handleCreateContainer(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
@@ -235,14 +237,12 @@ public class KeyValueHandler extends Handler {
     KeyValueContainer newContainer = new KeyValueContainer(
         newContainerData, conf);
 
-    try {
-      handlerLock.acquire();
+    boolean created = false;
+    try (AutoCloseableLock l = containerCreationLock.acquire()) {
       if (containerSet.getContainer(containerID) == null) {
         newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
-        containerSet.addContainer(newContainer);
-        sendICR(newContainer);
+        created = containerSet.addContainer(newContainer);
       } else {
-
         // The create container request for an already existing container can
         // arrive in case the ContainerStateMachine reapplies the transaction
         // on datanode restart. Just log a warning msg here.
@@ -251,10 +251,15 @@ public class KeyValueHandler extends Handler {
       }
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
-    } finally {
-      handlerLock.release();
     }
 
+    if (created) {
+      try {
+        sendICR(newContainer);
+      } catch (StorageContainerException ex) {
+        return ContainerUtils.logAndReturnError(LOG, ex, request);
+      }
+    }
     return ContainerUtils.getSuccessResponse(request);
   }
 
@@ -282,6 +287,14 @@ public class KeyValueHandler extends Handler {
       return ContainerUtils.malformedRequest(request);
     }
 
+    // The container can become unhealthy after the lock is released.
+    // The operation will likely fail/timeout in that happens.
+    try {
+      checkContainerIsHealthy(kvContainer);
+    } catch (StorageContainerException sce) {
+      return ContainerUtils.logAndReturnError(LOG, sce, request);
+    }
+
     KeyValueContainerData containerData = kvContainer.getContainerData();
     return KeyValueContainerUtil.getReadContainerResponse(
         request, containerData);
@@ -420,6 +433,14 @@ public class KeyValueHandler extends Handler {
       return ContainerUtils.malformedRequest(request);
     }
 
+    // The container can become unhealthy after the lock is released.
+    // The operation will likely fail/timeout in that happens.
+    try {
+      checkContainerIsHealthy(kvContainer);
+    } catch (StorageContainerException sce) {
+      return ContainerUtils.logAndReturnError(LOG, sce, request);
+    }
+
     BlockData responseData;
     try {
       BlockID blockID = BlockID.getFromProtobuf(
@@ -451,6 +472,14 @@ public class KeyValueHandler extends Handler {
       return ContainerUtils.malformedRequest(request);
     }
 
+    // The container can become unhealthy after the lock is released.
+    // The operation will likely fail/timeout in that happens.
+    try {
+      checkContainerIsHealthy(kvContainer);
+    } catch (StorageContainerException sce) {
+      return ContainerUtils.logAndReturnError(LOG, sce, request);
+    }
+
     long blockLength;
     try {
       BlockID blockID = BlockID
@@ -510,6 +539,14 @@ public class KeyValueHandler extends Handler {
       return ContainerUtils.malformedRequest(request);
     }
 
+    // The container can become unhealthy after the lock is released.
+    // The operation will likely fail/timeout in that happens.
+    try {
+      checkContainerIsHealthy(kvContainer);
+    } catch (StorageContainerException sce) {
+      return ContainerUtils.logAndReturnError(LOG, sce, request);
+    }
+
     ChunkInfo chunkInfo;
     byte[] data;
     try {
@@ -538,6 +575,27 @@ public class KeyValueHandler extends Handler {
   }
 
   /**
+   * Throw an exception if the container is unhealthy.
+   *
+   * @throws StorageContainerException if the container is unhealthy.
+   * @param kvContainer
+   */
+  @VisibleForTesting
+  void checkContainerIsHealthy(KeyValueContainer kvContainer)
+      throws StorageContainerException {
+    kvContainer.readLock();
+    try {
+      if (kvContainer.getContainerData().getState() == State.UNHEALTHY) {
+        throw new StorageContainerException(
+            "The container replica is unhealthy.",
+            CONTAINER_UNHEALTHY);
+      }
+    } finally {
+      kvContainer.readUnlock();
+    }
+  }
+
+  /**
    * Handle Delete Chunk operation. Calls ChunkManager to process the request.
    */
   ContainerCommandResponseProto handleDeleteChunk(
@@ -549,6 +607,14 @@ public class KeyValueHandler extends Handler {
       return ContainerUtils.malformedRequest(request);
     }
 
+    // The container can become unhealthy after the lock is released.
+    // The operation will likely fail/timeout in that happens.
+    try {
+      checkContainerIsHealthy(kvContainer);
+    } catch (StorageContainerException sce) {
+      return ContainerUtils.logAndReturnError(LOG, sce, request);
+    }
+
     try {
       checkContainerOpen(kvContainer);
 
@@ -697,6 +763,14 @@ public class KeyValueHandler extends Handler {
       return ContainerUtils.malformedRequest(request);
     }
 
+    // The container can become unhealthy after the lock is released.
+    // The operation will likely fail/timeout in that happens.
+    try {
+      checkContainerIsHealthy(kvContainer);
+    } catch (StorageContainerException sce) {
+      return ContainerUtils.logAndReturnError(LOG, sce, request);
+    }
+
     GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
 
     try {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
new file mode 100644
index 0000000..e11bca5
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests unhealthy container functionality in the {@link KeyValueContainer}
+ * class.
+ */
+public class TestKeyValueContainerMarkUnhealthy {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestKeyValueContainerMarkUnhealthy.class);
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Rule
+  public Timeout timeout = new Timeout(600_000);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private OzoneConfiguration conf;
+  private String scmId = UUID.randomUUID().toString();
+  private VolumeSet volumeSet;
+  private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
+  private KeyValueContainerData keyValueContainerData;
+  private KeyValueContainer keyValueContainer;
+  private UUID datanodeId;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new OzoneConfiguration();
+    datanodeId = UUID.randomUUID();
+    HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
+        .getAbsolutePath()).conf(conf).datanodeUuid(datanodeId
+        .toString()).build();
+
+    volumeSet = mock(VolumeSet.class);
+    volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
+    Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
+        .thenReturn(hddsVolume);
+
+    keyValueContainerData = new KeyValueContainerData(1L,
+        (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
+        datanodeId.toString());
+    final File metaDir = GenericTestUtils.getRandomizedTestDir();
+    metaDir.mkdirs();
+    keyValueContainerData.setMetadataPath(metaDir.getPath());
+
+
+    keyValueContainer = new KeyValueContainer(
+        keyValueContainerData, conf);
+  }
+
+  @After
+  public void teardown() {
+    volumeSet = null;
+    keyValueContainer = null;
+    keyValueContainerData = null;
+  }
+
+  /**
+   * Verify that the .container file is correctly updated when a
+   * container is marked as unhealthy.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testMarkContainerUnhealthy() throws IOException {
+    assertThat(keyValueContainerData.getState(), is(OPEN));
+    keyValueContainer.markContainerUnhealthy();
+    assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+
+    // Check metadata in the .container file
+    File containerFile = keyValueContainer.getContainerFile();
+
+    keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
+        .readContainerFile(containerFile);
+    assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+  }
+
+  /**
+   * Attempting to close an unhealthy container should fail.
+   * @throws IOException
+   */
+  @Test
+  public void testCloseUnhealthyContainer() throws IOException {
+    keyValueContainer.markContainerUnhealthy();
+    thrown.expect(StorageContainerException.class);
+    keyValueContainer.markContainerForClose();
+  }
+
+  /**
+   * Attempting to mark a closed container as unhealthy should succeed.
+   */
+  @Test
+  public void testMarkClosedContainerAsUnhealthy() throws IOException {
+    // We need to create the container so the compact-on-close operation
+    // does not NPE.
+    keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+    keyValueContainer.close();
+    keyValueContainer.markContainerUnhealthy();
+    assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+  }
+
+  /**
+   * Attempting to mark a quasi-closed container as unhealthy should succeed.
+   */
+  @Test
+  public void testMarkQuasiClosedContainerAsUnhealthy() throws IOException {
+    keyValueContainer.quasiClose();
+    keyValueContainer.markContainerUnhealthy();
+    assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+  }
+
+  /**
+   * Attempting to mark a closing container as unhealthy should succeed.
+   */
+  @Test
+  public void testMarkClosingContainerAsUnhealthy() throws IOException {
+    keyValueContainer.markContainerForClose();
+    keyValueContainer.markContainerUnhealthy();
+    assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
new file mode 100644
index 0000000..e9443b1
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Test that KeyValueHandler fails certain operations when the
+ * container is unhealthy.
+ */
+public class TestKeyValueHandlerWithUnhealthyContainer {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestKeyValueHandlerWithUnhealthyContainer.class);
+
+  private final static String DATANODE_UUID = UUID.randomUUID().toString();
+  private static final long DUMMY_CONTAINER_ID = 9999;
+
+  @Test
+  public void testRead() throws IOException {
+    KeyValueContainer container = getMockUnhealthyContainer();
+    KeyValueHandler handler = getDummyHandler();
+
+    ContainerProtos.ContainerCommandResponseProto response =
+        handler.handleReadContainer(
+            getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer),
+            container);
+    assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+  }
+
+  @Test
+  public void testGetBlock() throws IOException {
+    KeyValueContainer container = getMockUnhealthyContainer();
+    KeyValueHandler handler = getDummyHandler();
+
+    ContainerProtos.ContainerCommandResponseProto response =
+        handler.handleGetBlock(
+            getDummyCommandRequestProto(ContainerProtos.Type.GetBlock),
+            container);
+    assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+  }
+
+  @Test
+  public void testGetCommittedBlockLength() throws IOException {
+    KeyValueContainer container = getMockUnhealthyContainer();
+    KeyValueHandler handler = getDummyHandler();
+
+    ContainerProtos.ContainerCommandResponseProto response =
+        handler.handleGetCommittedBlockLength(
+            getDummyCommandRequestProto(
+                ContainerProtos.Type.GetCommittedBlockLength),
+            container);
+    assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+  }
+
+  @Test
+  public void testReadChunk() throws IOException {
+    KeyValueContainer container = getMockUnhealthyContainer();
+    KeyValueHandler handler = getDummyHandler();
+
+    ContainerProtos.ContainerCommandResponseProto response =
+        handler.handleReadChunk(
+            getDummyCommandRequestProto(
+                ContainerProtos.Type.ReadChunk),
+            container, null);
+    assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+  }
+
+  @Test
+  public void testDeleteChunk() throws IOException {
+    KeyValueContainer container = getMockUnhealthyContainer();
+    KeyValueHandler handler = getDummyHandler();
+
+    ContainerProtos.ContainerCommandResponseProto response =
+        handler.handleDeleteChunk(
+            getDummyCommandRequestProto(
+                ContainerProtos.Type.DeleteChunk),
+            container);
+    assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+  }
+
+  @Test
+  public void testGetSmallFile() throws IOException {
+    KeyValueContainer container = getMockUnhealthyContainer();
+    KeyValueHandler handler = getDummyHandler();
+
+    ContainerProtos.ContainerCommandResponseProto response =
+        handler.handleGetSmallFile(
+            getDummyCommandRequestProto(
+                ContainerProtos.Type.GetSmallFile),
+            container);
+    assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+  }
+
+  // -- Helper methods below.
+
+  private KeyValueHandler getDummyHandler() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeDetails dnDetails = DatanodeDetails.newBuilder()
+        .setUuid(DATANODE_UUID)
+        .setHostName("dummyHost")
+        .setIpAddress("1.2.3.4")
+        .build();
+    DatanodeStateMachine stateMachine = mock(DatanodeStateMachine.class);
+    when(stateMachine.getDatanodeDetails()).thenReturn(dnDetails);
+
+    StateContext context = new StateContext(
+        conf, DatanodeStateMachine.DatanodeStates.RUNNING,
+        stateMachine);
+
+    return new KeyValueHandler(
+        new OzoneConfiguration(),
+        context,
+        mock(ContainerSet.class),
+        mock(VolumeSet.class),
+        mock(ContainerMetrics.class));
+  }
+
+  private KeyValueContainer getMockUnhealthyContainer() {
+    KeyValueContainerData containerData = mock(KeyValueContainerData.class);
+    when(containerData.getState()).thenReturn(
+        ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+    return new KeyValueContainer(containerData, new OzoneConfiguration());
+  }
+
+  /**
+   * Construct fake protobuf messages for various types of requests.
+   * This is tedious, however necessary to test. Protobuf classes are final
+   * and cannot be mocked by Mockito.
+   *
+   * @param cmdType type of the container command.
+   * @return
+   */
+  private ContainerProtos.ContainerCommandRequestProto 
getDummyCommandRequestProto(
+      ContainerProtos.Type cmdType) {
+    final ContainerProtos.ContainerCommandRequestProto.Builder builder =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(cmdType)
+            .setContainerID(DUMMY_CONTAINER_ID)
+            .setDatanodeUuid(DATANODE_UUID);
+
+    final ContainerProtos.DatanodeBlockID fakeBlockId =
+        ContainerProtos.DatanodeBlockID.newBuilder()
+            .setContainerID(DUMMY_CONTAINER_ID).setLocalID(1).build();
+
+    final ContainerProtos.ChunkInfo fakeChunkInfo =
+        ContainerProtos.ChunkInfo.newBuilder()
+            .setChunkName("dummy")
+            .setOffset(0)
+            .setLen(100)
+            .setChecksumData(ContainerProtos.ChecksumData.newBuilder()
+                .setBytesPerChecksum(1)
+                .setType(ContainerProtos.ChecksumType.CRC32)
+                .build())
+            .build();
+
+    switch(cmdType) {
+      case ReadContainer:
+        
builder.setReadContainer(ContainerProtos.ReadContainerRequestProto.newBuilder().build());
+        break;
+      case GetBlock:
+        builder.setGetBlock(ContainerProtos.GetBlockRequestProto.newBuilder()
+            .setBlockID(fakeBlockId).build());
+        break;
+      case GetCommittedBlockLength:
+        builder.setGetCommittedBlockLength(
+            ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder()
+                .setBlockID(fakeBlockId).build());
+      case ReadChunk:
+        builder.setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder()
+            .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo).build());
+        break;
+      case DeleteChunk:
+        
builder.setDeleteChunk(ContainerProtos.DeleteChunkRequestProto.newBuilder()
+            .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo).build());
+        break;
+      case GetSmallFile:
+        
builder.setGetSmallFile(ContainerProtos.GetSmallFileRequestProto.newBuilder()
+            .setBlock(ContainerProtos.GetBlockRequestProto.newBuilder()
+                .setBlockID(fakeBlockId)
+                .build())
+            .build());
+        break;
+
+      default:
+        Assert.fail("Unhandled request type " + cmdType + " in unit test");
+    }
+
+    return builder.build();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to