HDDS-161. Add functionality to queue ContainerClose command from SCM Heartbeat 
Response to Ratis.
Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7547740e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7547740e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7547740e

Branch: refs/heads/HDDS-48
Commit: 7547740e5c65edaa6c6f8aa1c8debabbdfb0945e
Parents: 2299488
Author: Anu Engineer <aengin...@apache.org>
Authored: Wed Jun 13 17:50:42 2018 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Wed Jun 13 18:48:59 2018 -0700

----------------------------------------------------------------------
 .../statemachine/DatanodeStateMachine.java      |   9 +
 .../CloseContainerCommandHandler.java           |  21 +-
 .../commandhandler/CommandDispatcher.java       |   4 +
 .../common/transport/server/XceiverServer.java  |   7 +
 .../transport/server/XceiverServerGrpc.java     |   9 +
 .../transport/server/XceiverServerSpi.java      |   7 +
 .../server/ratis/XceiverServerRatis.java        |  56 ++++-
 .../container/ozoneimpl/OzoneContainer.java     |  62 +++++-
 .../commands/CloseContainerCommand.java         |  12 +-
 .../StorageContainerDatanodeProtocol.proto      |   1 +
 .../container/CloseContainerEventHandler.java   |   3 +-
 .../scm/container/closer/ContainerCloser.java   |   7 +-
 .../TestCloseContainerByPipeline.java           | 221 +++++++++++++++++++
 .../TestCloseContainerHandler.java              |   7 +-
 14 files changed, 412 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index cb4319d..dc4e673 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -403,4 +403,13 @@ public class DatanodeStateMachine implements Closeable {
   public long getCommandHandled() {
     return commandsHandled;
   }
+
+  /**
+   * returns the Command Dispatcher.
+   * @return CommandDispatcher
+   */
+  @VisibleForTesting
+  public CommandDispatcher getCommandDispatcher() {
+    return commandDispatcher;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index e8c602d..45f2bbd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -29,6 +31,8 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.UUID;
+
 /**
  * Handler for close container command received from SCM.
  */
@@ -67,8 +71,23 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
           CloseContainerCommandProto
               .parseFrom(command.getProtoBufMessage());
       containerID = closeContainerProto.getContainerID();
+      HddsProtos.ReplicationType replicationType =
+          closeContainerProto.getReplicationType();
+
+      ContainerProtos.CloseContainerRequestProto.Builder closeRequest =
+          ContainerProtos.CloseContainerRequestProto.newBuilder();
+      closeRequest.setContainerID(containerID);
 
-      container.getContainerManager().closeContainer(containerID);
+      ContainerProtos.ContainerCommandRequestProto.Builder request =
+          ContainerProtos.ContainerCommandRequestProto.newBuilder();
+      request.setCmdType(ContainerProtos.Type.CloseContainer);
+      request.setCloseContainer(closeRequest);
+      request.setTraceID(UUID.randomUUID().toString());
+      request.setDatanodeUuid(
+          context.getParent().getDatanodeDetails().getUuidString());
+      // submit the close container request for the XceiverServer to handle
+      container.submitContainerRequest(
+          request.build(), replicationType);
 
     } catch (Exception e) {
       LOG.error("Can't close container " + containerID, e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index aedd78f..5163d98 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -77,6 +77,10 @@ public final class CommandDispatcher {
     }
   }
 
+  public CommandHandler getCloseContainerHandler() {
+    return handlerMap.get(Type.closeContainerCommand);
+  }
+
   /**
    * Dispatch the command to the correct handler.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
index 455df49..3a469de 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common.transport.server;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.shaded.io.netty.channel.Channel;
 import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
@@ -129,4 +130,10 @@ public final class XceiverServer implements 
XceiverServerSpi {
       channel.close().awaitUninterruptibly();
     }
   }
+
+  @Override
+  public void submitRequest(
+      ContainerProtos.ContainerCommandRequestProto request) throws IOException 
{
+    storageContainer.dispatch(request);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 550fe41..0a9e1db 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.ozone.container.common.transport.server;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -44,6 +45,7 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
       LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
   private int port;
   private Server server;
+  private final ContainerDispatcher storageContainer;
 
   /**
    * Constructs a Grpc server class.
@@ -77,6 +79,7 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
         .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
         .addService(new GrpcXceiverService(dispatcher))
         .build();
+    storageContainer = dispatcher;
   }
 
   @Override
@@ -103,4 +106,10 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
   public void stop() {
     server.shutdown();
   }
+
+  @Override
+  public void submitRequest(
+      ContainerProtos.ContainerCommandRequestProto request) throws IOException 
{
+    storageContainer.dispatch(request);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index dad9e9f..49579f2 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
 import java.io.IOException;
@@ -40,4 +41,10 @@ public interface XceiverServerSpi {
    */
   HddsProtos.ReplicationType getServerType();
 
+  /**
+   * submits a containerRequest to be performed by the replication pipeline.
+   * @param request ContainerCommandRequest
+   */
+  void submitRequest(ContainerProtos.ContainerCommandRequestProto request)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 33c25ea..b9c7cae 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -33,10 +35,12 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.shaded.proto.RaftProtos;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -49,8 +53,10 @@ import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Creates a ratis server endpoint that acts as the communication layer for
@@ -58,6 +64,12 @@ import java.util.concurrent.TimeUnit;
  */
 public final class XceiverServerRatis implements XceiverServerSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+  private static final AtomicLong callIdCounter = new AtomicLong();
+
+  private static long nextCallId() {
+    return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
+  }
+
   private final int port;
   private final RaftServer server;
   private ThreadPoolExecutor writeChunkExecutor;
@@ -241,4 +253,46 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   public HddsProtos.ReplicationType getServerType() {
     return HddsProtos.ReplicationType.RATIS;
   }
-}
+
+  @VisibleForTesting
+  public RaftServer getServer() {
+    return server;
+  }
+
+  private void processReply(RaftClientReply reply) {
+
+    // NotLeader exception is thrown only when the raft server to which the
+    // request is submitted is not the leader. The request will be rejected
+    // and will eventually be executed once the request comnes via the leader
+    // node.
+    NotLeaderException notLeaderException = reply.getNotLeaderException();
+    if (notLeaderException != null) {
+      LOG.info(reply.getNotLeaderException().getLocalizedMessage());
+    }
+    StateMachineException stateMachineException =
+        reply.getStateMachineException();
+    if (stateMachineException != null) {
+      // In case the request could not be completed, StateMachine Exception
+      // will be thrown. For now, Just log the message.
+      // If the container could not be closed, SCM will come to know
+      // via containerReports. CloseContainer should be re tried via SCM.
+      LOG.error(stateMachineException.getLocalizedMessage());
+    }
+  }
+
+  @Override
+  public void submitRequest(
+      ContainerProtos.ContainerCommandRequestProto request) throws IOException 
{
+    ClientId clientId = ClientId.randomId();
+    RaftClientRequest raftClientRequest =
+        new RaftClientRequest(clientId, server.getId(),
+            RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
+            Message.valueOf(request.toByteString()), RaftClientRequest
+            // ReplicationLevel.ALL ensures the transactions corresponding to
+            // the request here are applied on all the raft servers.
+            .writeRequestType(RaftProtos.ReplicationLevel.ALL));
+    CompletableFuture<RaftClientReply> reply =
+        server.submitClientRequestAsync(raftClientRequest);
+    reply.thenAccept(this::processReply);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index b357fef..4156f5a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -72,7 +74,7 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
  * layer.
  */
 public class OzoneContainer {
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(OzoneContainer.class);
 
   private final Configuration ozoneConfig;
@@ -269,9 +271,65 @@ public class OzoneContainer {
     return this.manager.getClosedContainerReports();
   }
 
+  private XceiverServerSpi getRatisSerer() {
+    for (XceiverServerSpi serverInstance : server) {
+      if (serverInstance instanceof XceiverServerRatis) {
+        return serverInstance;
+      }
+    }
+    return null;
+  }
+
+  private XceiverServerSpi getStandaAloneSerer() {
+    for (XceiverServerSpi serverInstance : server) {
+      if (!(serverInstance instanceof XceiverServerRatis)) {
+        return serverInstance;
+      }
+    }
+    return null;
+  }
+
   @VisibleForTesting
   public ContainerManager getContainerManager() {
     return this.manager;
   }
 
-}
+  public void submitContainerRequest(
+      ContainerProtos.ContainerCommandRequestProto request,
+      HddsProtos.ReplicationType replicationType) throws IOException {
+    XceiverServerSpi serverInstance;
+    long containerId = getContainerIdForCmd(request);
+    if (replicationType == HddsProtos.ReplicationType.RATIS) {
+      serverInstance = getRatisSerer();
+      Preconditions.checkNotNull(serverInstance);
+      serverInstance.submitRequest(request);
+      LOG.info("submitting {} request over RATIS server for container {}",
+          request.getCmdType(), containerId);
+    } else {
+      serverInstance = getStandaAloneSerer();
+      Preconditions.checkNotNull(serverInstance);
+      getStandaAloneSerer().submitRequest(request);
+      LOG.info(
+          "submitting {} request over STAND_ALONE server for container {}",
+          request.getCmdType(), containerId);
+    }
+
+  }
+
+  private long getContainerIdForCmd(
+      ContainerProtos.ContainerCommandRequestProto request)
+      throws IllegalArgumentException {
+    ContainerProtos.Type type = request.getCmdType();
+    switch (type) {
+    case CloseContainer:
+      return request.getCloseContainer().getContainerID();
+      // Right now, we handle only closeContainer via queuing it over the
+      // over the XceiVerServer. For all other commands we throw Illegal
+      // argument exception here. Will need to extend the switch cases
+      // in case we want add another commands here.
+    default:
+      throw new IllegalArgumentException("Cmd " + request.getCmdType()
+          + " not supported over HearBeat Response");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index 4f4f82b..c7d8df5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -31,9 +32,12 @@ public class CloseContainerCommand
     extends SCMCommand<CloseContainerCommandProto> {
 
   private long containerID;
+  private HddsProtos.ReplicationType replicationType;
 
-  public CloseContainerCommand(long containerID) {
+  public CloseContainerCommand(long containerID,
+      HddsProtos.ReplicationType replicationType) {
     this.containerID = containerID;
+    this.replicationType = replicationType;
   }
 
   /**
@@ -58,13 +62,15 @@ public class CloseContainerCommand
 
   public CloseContainerCommandProto getProto() {
     return CloseContainerCommandProto.newBuilder()
-        .setContainerID(containerID).build();
+        .setContainerID(containerID)
+        .setReplicationType(replicationType).build();
   }
 
   public static CloseContainerCommand getFromProtobuf(
       CloseContainerCommandProto closeContainerProto) {
     Preconditions.checkNotNull(closeContainerProto);
-    return new CloseContainerCommand(closeContainerProto.getContainerID());
+    return new CloseContainerCommand(closeContainerProto.getContainerID(),
+        closeContainerProto.getReplicationType());
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 95e210e..f6aba05 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -223,6 +223,7 @@ This command asks the datanode to close a specific 
container.
 */
 message CloseContainerCommandProto {
   required int64 containerID = 1;
+  required hadoop.hdds.ReplicationType replicationType = 2;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index bc95b55..16e84a3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -63,7 +63,8 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
     if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
       for (DatanodeDetails datanode : info.getPipeline().getMachines()) {
         
containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
-            new CloseContainerCommand(containerID.getId()));
+            new CloseContainerCommand(containerID.getId(),
+                info.getPipeline().getType()));
       }
       try {
         // Finalize event will make sure the state of the container transitions

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
index 75ec8e1..937076c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java
@@ -127,11 +127,12 @@ public class ContainerCloser {
     // to SCM. In that case also, data node will ignore this command.
 
     HddsProtos.Pipeline pipeline = info.getPipeline();
-    for (HddsProtos.DatanodeDetailsProto datanodeDetails :
-        pipeline.getPipelineChannel().getMembersList()) {
+    for (HddsProtos.DatanodeDetailsProto datanodeDetails : pipeline
+        .getPipelineChannel().getMembersList()) {
       nodeManager.addDatanodeCommand(
           DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
-          new CloseContainerCommand(info.getContainerID()));
+          new CloseContainerCommand(info.getContainerID(),
+              pipeline.getPipelineChannel().getType()));
     }
     if (!commandIssued.containsKey(info.getContainerID())) {
       commandIssued.put(info.getContainerID(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
new file mode 100644
index 0000000..9e8cb46
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class TestCloseContainerByPipeline {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3).build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    objectStore.createVolume("test");
+    objectStore.getVolume("test").createBucket("test");
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testCloseContainerViaStandaAlone()
+      throws IOException, TimeoutException, InterruptedException {
+
+    OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
+        .createKey("standalone", 1024, ReplicationType.STAND_ALONE,
+            ReplicationFactor.ONE);
+    key.write("standalone".getBytes());
+    key.close();
+
+    //get the name of a valid container
+    KsmKeyArgs keyArgs =
+        new KsmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
+            .setType(HddsProtos.ReplicationType.STAND_ALONE)
+            .setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
+            .setKeyName("standalone").build();
+
+    KsmKeyLocationInfo ksmKeyLocationInfo =
+        
cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions()
+            .get(0).getBlocksLatestVersionOnly().get(0);
+
+    long containerID = ksmKeyLocationInfo.getContainerID();
+    List<DatanodeDetails> datanodes =
+        cluster.getStorageContainerManager().getContainerInfo(containerID)
+            .getPipeline().getMachines();
+    Assert.assertTrue(datanodes.size() == 1);
+
+    DatanodeDetails datanodeDetails = datanodes.get(0);
+    Assert
+        .assertFalse(isContainerClosed(cluster, containerID, datanodeDetails));
+
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
+    //send the order to close the container
+    cluster.getStorageContainerManager().getScmNodeManager()
+        .addDatanodeCommand(datanodeDetails.getUuid(),
+            new CloseContainerCommand(containerID,
+                HddsProtos.ReplicationType.STAND_ALONE));
+
+    GenericTestUtils
+        .waitFor(() -> isContainerClosed(cluster, containerID, 
datanodeDetails),
+            500, 5 * 1000);
+
+    //double check if it's really closed (waitFor also throws an exception)
+    Assert.assertTrue(isContainerClosed(cluster, containerID, 
datanodeDetails));
+    Assert.assertTrue(logCapturer.getOutput().contains(
+        "submitting CloseContainer request over STAND_ALONE server for"
+            + " container " + containerID));
+    // Make sure it was really closed via StandAlone not Ratis server
+    Assert.assertFalse((logCapturer.getOutput().contains(
+        "submitting CloseContainer request over RATIS server for container "
+            + containerID)));
+  }
+
+  @Test
+  public void testCloseContainerViaRatis() throws IOException,
+      TimeoutException, InterruptedException {
+
+    OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
+        .createKey("ratis", 1024, ReplicationType.RATIS,
+            ReplicationFactor.THREE);
+    key.write("ratis".getBytes());
+    key.close();
+
+    //get the name of a valid container
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder().setVolumeName("test").
+        setBucketName("test").setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setDataSize(1024)
+        .setKeyName("ratis").build();
+
+    KsmKeyLocationInfo ksmKeyLocationInfo =
+        
cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions()
+            .get(0).getBlocksLatestVersionOnly().get(0);
+
+    long containerID = ksmKeyLocationInfo.getContainerID();
+    List<DatanodeDetails> datanodes =
+        cluster.getStorageContainerManager().getContainerInfo(containerID)
+            .getPipeline().getMachines();
+    Assert.assertTrue(datanodes.size() == 3);
+
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
+
+    for (DatanodeDetails details : datanodes) {
+      Assert.assertFalse(isContainerClosed(cluster, containerID, details));
+      //send the order to close the container
+      cluster.getStorageContainerManager().getScmNodeManager()
+          .addDatanodeCommand(details.getUuid(),
+              new CloseContainerCommand(containerID,
+                  HddsProtos.ReplicationType.RATIS));
+  }
+
+    for (DatanodeDetails datanodeDetails : datanodes) {
+      GenericTestUtils.waitFor(
+          () -> isContainerClosed(cluster, containerID, datanodeDetails), 500,
+          5 * 1000);
+      //double check if it's really closed (waitFor also throws an exception)
+      Assert.assertTrue(isContainerClosed(cluster, containerID, 
datanodeDetails));
+    }
+    Assert.assertFalse(logCapturer.getOutput().contains(
+        "submitting CloseContainer request over STAND_ALONE "
+            + "server for container " + containerID));
+    // Make sure it was really closed via StandAlone not Ratis server
+    Assert.assertTrue((logCapturer.getOutput().contains(
+        "submitting CloseContainer request over RATIS server for container "
+            + containerID)));
+  }
+
+  private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,
+      DatanodeDetails datanode) {
+    ContainerData containerData;
+    try {
+      for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes())
+        if (datanode.equals(datanodeService.getDatanodeDetails())) {
+          containerData =
+              datanodeService.getDatanodeStateMachine().getContainer()
+                  .getContainerManager().readContainer(containerID);
+          if (!containerData.isOpen()) {
+            // make sure the closeContainerHandler on the Datanode is invoked
+            Assert.assertTrue(
+                
datanodeService.getDatanodeStateMachine().getCommandDispatcher()
+                    .getCloseContainerHandler().getInvocationCount() > 0);
+            return true;
+          }
+        }
+    } catch (StorageContainerException e) {
+      throw new AssertionError(e);
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index fbe43d7..efb7344 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -83,12 +83,13 @@ public class TestCloseContainerHandler {
 
     Assert.assertFalse(isContainerClosed(cluster, containerID));
 
-    DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
-        .getDatanodeDetails();
+    DatanodeDetails datanodeDetails =
+        cluster.getHddsDatanodes().get(0).getDatanodeDetails();
     //send the order to close the container
     cluster.getStorageContainerManager().getScmNodeManager()
         .addDatanodeCommand(datanodeDetails.getUuid(),
-            new CloseContainerCommand(containerID));
+            new CloseContainerCommand(containerID,
+                HddsProtos.ReplicationType.STAND_ALONE));
 
     GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerID),
             500,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to