http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
deleted file mode 100644
index 3bb284e..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license
- * agreements. See the NOTICE file distributed with this work for additional
- * information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
- * License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License. You may obtain a
- * copy of the License at
- *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- *
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.server;
-
-import com.google.protobuf.BlockingService;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.protocolPB
-    .ScmBlockLocationProtocolServerSideTranslatorPB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
-    .startRpcServer;
-
-/**
- * SCM block protocol is the protocol used by Namenode and OzoneManager to get
- * blocks from the SCM.
- */
-public class SCMBlockProtocolServer implements ScmBlockLocationProtocol {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SCMBlockProtocolServer.class);
-
-  private final StorageContainerManager scm;
-  private final OzoneConfiguration conf;
-  private final RPC.Server blockRpcServer;
-  private final InetSocketAddress blockRpcAddress;
-
-  /**
-   * The RPC server that listens to requests from block service clients.
-   */
-  public SCMBlockProtocolServer(OzoneConfiguration conf,
-      StorageContainerManager scm) throws IOException {
-    this.scm = scm;
-    this.conf = conf;
-    final int handlerCount =
-        conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
-            OZONE_SCM_HANDLER_COUNT_DEFAULT);
-
-    RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-    // SCM Block Service RPC
-    BlockingService blockProtoPbService =
-        ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService
-            .newReflectiveBlockingService(
-                new ScmBlockLocationProtocolServerSideTranslatorPB(this));
-
-    final InetSocketAddress scmBlockAddress = HddsServerUtil
-        .getScmBlockClientBindAddress(conf);
-    blockRpcServer =
-        startRpcServer(
-            conf,
-            scmBlockAddress,
-            ScmBlockLocationProtocolPB.class,
-            blockProtoPbService,
-            handlerCount);
-    blockRpcAddress =
-        updateRPCListenAddress(
-            conf, OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress,
-            blockRpcServer);
-
-  }
-
-  public RPC.Server getBlockRpcServer() {
-    return blockRpcServer;
-  }
-
-  public InetSocketAddress getBlockRpcAddress() {
-    return blockRpcAddress;
-  }
-
-  public void start() {
-    LOG.info(
-        StorageContainerManager.buildRpcServerStartMessage(
-            "RPC server for Block Protocol", getBlockRpcAddress()));
-    getBlockRpcServer().start();
-  }
-
-  public void stop() {
-    try {
-      LOG.info("Stopping the RPC server for Block Protocol");
-      getBlockRpcServer().stop();
-    } catch (Exception ex) {
-      LOG.error("Block Protocol RPC stop failed.", ex);
-    }
-    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
-  }
-
-  public void join() throws InterruptedException {
-    LOG.trace("Join RPC server for Block Protocol");
-    getBlockRpcServer().join();
-  }
-
-  @Override
-  public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType
-      type, HddsProtos.ReplicationFactor factor, String owner) throws
-      IOException {
-    return scm.getScmBlockManager().allocateBlock(size, type, factor, owner);
-  }
-
-  /**
-   * Delete blocks for a set of object keys.
-   *
-   * @param keyBlocksInfoList list of block keys with object keys to delete.
-   * @return deletion results.
-   */
-  @Override
-  public List<DeleteBlockGroupResult> deleteKeyBlocks(
-      List<BlockGroup> keyBlocksInfoList) throws IOException {
-    LOG.info("SCM is informed by OM to delete {} blocks", keyBlocksInfoList
-        .size());
-    List<DeleteBlockGroupResult> results = new ArrayList<>();
-    for (BlockGroup keyBlocks : keyBlocksInfoList) {
-      ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode;
-      try {
-        // We delete blocks in an atomic operation to prevent getting
-        // into state like only a partial of blocks are deleted,
-        // which will leave key in an inconsistent state.
-        scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList());
-        resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
-            .Result.success;
-      } catch (SCMException scmEx) {
-        LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
-        switch (scmEx.getResult()) {
-        case CHILL_MODE_EXCEPTION:
-          resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
-              .Result.chillMode;
-          break;
-        case FAILED_TO_FIND_BLOCK:
-          resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
-              .Result.errorNotFound;
-          break;
-        default:
-          resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
-              .Result.unknownFailure;
-        }
-      } catch (IOException ex) {
-        LOG.warn("Fail to delete blocks for object key: {}", keyBlocks
-            .getGroupID(), ex);
-        resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
-            .Result.unknownFailure;
-      }
-      List<DeleteBlockResult> blockResultList = new ArrayList<>();
-      for (BlockID blockKey : keyBlocks.getBlockIDList()) {
-        blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
-      }
-      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
-          blockResultList));
-    }
-    return results;
-  }
-
-  @Override
-  public ScmInfo getScmInfo() throws IOException {
-    ScmInfo.Builder builder =
-        new ScmInfo.Builder()
-            .setClusterId(scm.getScmStorage().getClusterID())
-            .setScmId(scm.getScmStorage().getScmId());
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
deleted file mode 100644
index 3c1cc8f..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
-    .NodeRegistrationContainerReport;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StorageContainerManager enters chill mode on startup to allow system to
- * reach a stable state before becoming fully functional. SCM will wait
- * for certain resources to be reported before coming out of chill mode.
- *
- * ChillModeExitRule defines format to define new rules which must be satisfied
- * to exit Chill mode.
- * ContainerChillModeRule defines the only exit criteria right now.
- * On every new datanode registration event this class adds replicas
- * for reported containers and validates if cutoff threshold for
- * containers is meet.
- */
-public class SCMChillModeManager implements
-    EventHandler<NodeRegistrationContainerReport> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SCMChillModeManager.class);
-  private AtomicBoolean inChillMode = new AtomicBoolean(true);
-  private AtomicLong containerWithMinReplicas = new AtomicLong(0);
-  private Map<String, ChillModeExitRule> exitRules = new HashMap(1);
-  private Configuration config;
-  private static final String CONT_EXIT_RULE = "ContainerChillModeRule";
-  private final EventQueue eventPublisher;
-
-  SCMChillModeManager(Configuration conf, List<ContainerInfo> allContainers,
-      EventQueue eventQueue) {
-    this.config = conf;
-    this.eventPublisher = eventQueue;
-    exitRules
-        .put(CONT_EXIT_RULE, new ContainerChillModeRule(config, 
allContainers));
-    if (!conf.getBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
-        HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT)) {
-      exitChillMode(eventQueue);
-    }
-    emitChillModeStatus();
-  }
-
-  /**
-   * Emit Chill mode status.
-   */
-  @VisibleForTesting
-  public void emitChillModeStatus() {
-    eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS, inChillMode.get());
-  }
-
-  private void validateChillModeExitRules(EventPublisher eventQueue) {
-    for (ChillModeExitRule exitRule : exitRules.values()) {
-      if (!exitRule.validate()) {
-        return;
-      }
-    }
-    exitChillMode(eventQueue);
-  }
-
-  /**
-   * Exit chill mode. It does following actions:
-   * 1. Set chill mode status to fale.
-   * 2. Emits START_REPLICATION for ReplicationManager.
-   * 3. Cleanup resources.
-   * 4. Emit chill mode status.
-   * @param eventQueue
-   */
-  @VisibleForTesting
-  public void exitChillMode(EventPublisher eventQueue) {
-    LOG.info("SCM exiting chill mode.");
-    setInChillMode(false);
-
-    // TODO: Remove handler registration as there is no need to listen to
-    // register events anymore.
-
-    for (ChillModeExitRule e : exitRules.values()) {
-      e.cleanup();
-    }
-    emitChillModeStatus();
-  }
-
-  @Override
-  public void onMessage(
-      NodeRegistrationContainerReport nodeRegistrationContainerReport,
-      EventPublisher publisher) {
-    if (getInChillMode()) {
-      exitRules.get(CONT_EXIT_RULE).process(nodeRegistrationContainerReport);
-      validateChillModeExitRules(publisher);
-    }
-  }
-
-  public boolean getInChillMode() {
-    return inChillMode.get();
-  }
-
-  /**
-   * Set chill mode status.
-   */
-  public void setInChillMode(boolean inChillMode) {
-    this.inChillMode.set(inChillMode);
-  }
-
-  /**
-   * Interface for defining chill mode exit rules.
-   *
-   * @param <T>
-   */
-  public interface ChillModeExitRule<T> {
-
-    boolean validate();
-
-    void process(T report);
-
-    void cleanup();
-  }
-
-  /**
-   * Class defining Chill mode exit criteria for Containers.
-   */
-  public class ContainerChillModeRule implements
-      ChillModeExitRule<NodeRegistrationContainerReport> {
-
-    // Required cutoff % for containers with at least 1 reported replica.
-    private double chillModeCutoff;
-    // Containers read from scm db.
-    private Map<Long, ContainerInfo> containerMap;
-    private double maxContainer;
-
-    public ContainerChillModeRule(Configuration conf,
-        List<ContainerInfo> containers) {
-      chillModeCutoff = conf
-          .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
-              HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
-      containerMap = new ConcurrentHashMap<>();
-      if(containers != null) {
-        containers.forEach(c -> {
-          if (c != null) {
-            containerMap.put(c.getContainerID(), c);
-          }
-        });
-        maxContainer = containers.size();
-      }
-    }
-
-    @Override
-    public boolean validate() {
-      if (maxContainer == 0) {
-        return true;
-      }
-      return getCurrentContainerThreshold() >= chillModeCutoff;
-    }
-
-    @VisibleForTesting
-    public double getCurrentContainerThreshold() {
-      return (containerWithMinReplicas.doubleValue() / maxContainer);
-    }
-
-    @Override
-    public void process(NodeRegistrationContainerReport reportsProto) {
-      if (maxContainer == 0) {
-        // No container to check.
-        return;
-      }
-
-      reportsProto.getReport().getReportsList().forEach(c -> {
-        if (containerMap.containsKey(c.getContainerID())) {
-          if(containerMap.remove(c.getContainerID()) != null) {
-            containerWithMinReplicas.getAndAdd(1);
-          }
-        }
-      });
-      if(inChillMode.get()) {
-        LOG.info("SCM in chill mode. {} % containers have at least one"
-                + " reported replica.",
-            (containerWithMinReplicas.get() / maxContainer) * 100);
-      }
-    }
-
-    @Override
-    public void cleanup() {
-      containerMap.clear();
-    }
-  }
-
-  @VisibleForTesting
-  public static Logger getLogger() {
-    return LOG;
-  }
-
-  @VisibleForTesting
-  public double getCurrentContainerThreshold() {
-    return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE))
-        .getCurrentContainerThreshold();
-  }
-
-  /**
-   * Operations restricted in SCM chill mode.
-   */
-  public static class ChillModeRestrictedOps {
-    private static EnumSet restrictedOps =  EnumSet.noneOf(ScmOps.class);
-
-    static {
-      restrictedOps.add(ScmOps.allocateBlock);
-      restrictedOps.add(ScmOps.allocateContainer);
-    }
-
-    public static boolean isRestrictedInChillMode(ScmOps opName) {
-      return restrictedOps.contains(opName);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
deleted file mode 100644
index 66136f1..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license
- * agreements. See the NOTICE file distributed with this work for additional
- * information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
- * License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License. You may obtain a
- * copy of the License at
- *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- *
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos;
-import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.ScmUtils;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerLocationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos
-    .StorageContainerLocationProtocolService.newReflectiveBlockingService;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CLIENT_ADDRESS_KEY;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
-    .startRpcServer;
-
-/**
- * The RPC server that listens to requests from clients.
- */
-public class SCMClientProtocolServer implements
-    StorageContainerLocationProtocol, EventHandler<Boolean> {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SCMClientProtocolServer.class);
-  private final RPC.Server clientRpcServer;
-  private final InetSocketAddress clientRpcAddress;
-  private final StorageContainerManager scm;
-  private final OzoneConfiguration conf;
-  private ChillModePrecheck chillModePrecheck = new ChillModePrecheck();
-
-  public SCMClientProtocolServer(OzoneConfiguration conf,
-      StorageContainerManager scm) throws IOException {
-    this.scm = scm;
-    this.conf = conf;
-    final int handlerCount =
-        conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
-            OZONE_SCM_HANDLER_COUNT_DEFAULT);
-    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-
-    // SCM Container Service RPC
-    BlockingService storageProtoPbService =
-        newReflectiveBlockingService(
-            new StorageContainerLocationProtocolServerSideTranslatorPB(this));
-
-    final InetSocketAddress scmAddress = HddsServerUtil
-        .getScmClientBindAddress(conf);
-    clientRpcServer =
-        startRpcServer(
-            conf,
-            scmAddress,
-            StorageContainerLocationProtocolPB.class,
-            storageProtoPbService,
-            handlerCount);
-    clientRpcAddress =
-        updateRPCListenAddress(conf, OZONE_SCM_CLIENT_ADDRESS_KEY,
-            scmAddress, clientRpcServer);
-
-  }
-
-  public RPC.Server getClientRpcServer() {
-    return clientRpcServer;
-  }
-
-  public InetSocketAddress getClientRpcAddress() {
-    return clientRpcAddress;
-  }
-
-  public void start() {
-    LOG.info(
-        StorageContainerManager.buildRpcServerStartMessage(
-            "RPC server for Client ", getClientRpcAddress()));
-    getClientRpcServer().start();
-  }
-
-  public void stop() {
-    try {
-      LOG.info("Stopping the RPC server for Client Protocol");
-      getClientRpcServer().stop();
-    } catch (Exception ex) {
-      LOG.error("Client Protocol RPC stop failed.", ex);
-    }
-    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
-  }
-
-  public void join() throws InterruptedException {
-    LOG.trace("Join RPC server for Client Protocol");
-    getClientRpcServer().join();
-  }
-
-  @VisibleForTesting
-  public String getRpcRemoteUsername() {
-    UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
-    return user == null ? null : user.getUserName();
-  }
-
-  @Override
-  public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
-      replicationType, HddsProtos.ReplicationFactor factor,
-      String owner) throws IOException {
-    ScmUtils.preCheck(ScmOps.allocateContainer, chillModePrecheck);
-    String remoteUser = getRpcRemoteUsername();
-    getScm().checkAdminAccess(remoteUser);
-
-    return scm.getScmContainerManager()
-        .allocateContainer(replicationType, factor, owner);
-  }
-
-  @Override
-  public ContainerInfo getContainer(long containerID) throws IOException {
-    String remoteUser = getRpcRemoteUsername();
-    getScm().checkAdminAccess(remoteUser);
-    return scm.getScmContainerManager()
-        .getContainer(containerID);
-  }
-
-  @Override
-  public ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException {
-    if (chillModePrecheck.isInChillMode()) {
-      ContainerInfo contInfo = scm.getScmContainerManager()
-          .getContainer(containerID);
-      if (contInfo.isContainerOpen()) {
-        if (!hasRequiredReplicas(contInfo)) {
-          throw new SCMException("Open container " + containerID + " doesn't"
-              + " have enough replicas to service this operation in "
-              + "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION);
-        }
-      }
-    }
-    String remoteUser = getRpcRemoteUsername();
-    getScm().checkAdminAccess(remoteUser);
-    return scm.getScmContainerManager()
-        .getContainerWithPipeline(containerID);
-  }
-
-  /**
-   * Check if container reported replicas are equal or greater than required
-   * replication factor.
-   */
-  private boolean hasRequiredReplicas(ContainerInfo contInfo) {
-    try{
-      return getScm().getScmContainerManager().getStateManager()
-          .getContainerReplicas(contInfo.containerID())
-          .size() >= contInfo.getReplicationFactor().getNumber();
-    } catch (SCMException ex) {
-      // getContainerReplicas throws exception if no replica's exist for given
-      // container.
-      return false;
-    }
-  }
-
-  @Override
-  public List<ContainerInfo> listContainer(long startContainerID,
-      int count) throws IOException {
-    return scm.getScmContainerManager().
-        listContainer(startContainerID, count);
-  }
-
-  @Override
-  public void deleteContainer(long containerID) throws IOException {
-    String remoteUser = getRpcRemoteUsername();
-    getScm().checkAdminAccess(remoteUser);
-    scm.getScmContainerManager().deleteContainer(containerID);
-
-  }
-
-  @Override
-  public List<HddsProtos.Node> queryNode(HddsProtos.NodeState state,
-      HddsProtos.QueryScope queryScope, String poolName) throws
-      IOException {
-
-    if (queryScope == HddsProtos.QueryScope.POOL) {
-      throw new IllegalArgumentException("Not Supported yet");
-    }
-
-    List<HddsProtos.Node> result = new ArrayList<>();
-    queryNode(state).forEach(node -> result.add(HddsProtos.Node.newBuilder()
-        .setNodeID(node.getProtoBufMessage())
-        .addNodeStates(state)
-        .build()));
-
-    return result;
-
-  }
-
-  @Override
-  public void notifyObjectStageChange(StorageContainerLocationProtocolProtos
-      .ObjectStageChangeRequestProto.Type type, long id,
-      StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto.Op
-          op, StorageContainerLocationProtocolProtos
-      .ObjectStageChangeRequestProto.Stage stage) throws IOException {
-
-    LOG.info("Object type {} id {} op {} new stage {}", type, id, op,
-        stage);
-    if (type == StorageContainerLocationProtocolProtos
-        .ObjectStageChangeRequestProto.Type.container) {
-      if (op == StorageContainerLocationProtocolProtos
-          .ObjectStageChangeRequestProto.Op.create) {
-        if (stage == StorageContainerLocationProtocolProtos
-            .ObjectStageChangeRequestProto.Stage.begin) {
-          scm.getScmContainerManager().updateContainerState(id, HddsProtos
-              .LifeCycleEvent.CREATE);
-        } else {
-          scm.getScmContainerManager().updateContainerState(id, HddsProtos
-              .LifeCycleEvent.CREATED);
-        }
-      } else {
-        if (op == StorageContainerLocationProtocolProtos
-            .ObjectStageChangeRequestProto.Op.close) {
-          if (stage == StorageContainerLocationProtocolProtos
-              .ObjectStageChangeRequestProto.Stage.begin) {
-            scm.getScmContainerManager().updateContainerState(id, HddsProtos
-                .LifeCycleEvent.FINALIZE);
-          } else {
-            scm.getScmContainerManager().updateContainerState(id, HddsProtos
-                .LifeCycleEvent.CLOSE);
-          }
-        }
-      }
-    } // else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
-    // TODO: pipeline state update will be addressed in future patch.
-    // }
-
-  }
-
-  @Override
-  public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
-      HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
-      throws IOException {
-    // TODO: will be addressed in future patch.
-    // This is needed only for debugging purposes to make sure cluster is
-    // working correctly.
-    return null;
-  }
-
-  @Override
-  public ScmInfo getScmInfo() throws IOException {
-    ScmInfo.Builder builder =
-        new ScmInfo.Builder()
-            .setClusterId(scm.getScmStorage().getClusterID())
-            .setScmId(scm.getScmStorage().getScmId());
-    return builder.build();
-  }
-
-  /**
-   * Check if SCM is in chill mode.
-   *
-   * @return Returns true if SCM is in chill mode else returns false.
-   * @throws IOException
-   */
-  @Override
-  public boolean inChillMode() throws IOException {
-    return scm.isInChillMode();
-  }
-
-  /**
-   * Force SCM out of Chill mode.
-   *
-   * @return returns true if operation is successful.
-   * @throws IOException
-   */
-  @Override
-  public boolean forceExitChillMode() throws IOException {
-    return scm.exitChillMode();
-  }
-
-  /**
-   * Queries a list of Node that match a set of statuses.
-   *
-   * <p>For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, then
-   * this call will return all
-   * healthy nodes which members in Raft pipeline.
-   *
-   * <p>Right now we don't support operations, so we assume it is an AND
-   * operation between the
-   * operators.
-   *
-   * @param state - NodeStates.
-   * @return List of Datanodes.
-   */
-  public List<DatanodeDetails> queryNode(HddsProtos.NodeState state) {
-    Preconditions.checkNotNull(state, "Node Query set cannot be null");
-    return new LinkedList<>(queryNodeState(state));
-  }
-
-  @VisibleForTesting
-  public StorageContainerManager getScm() {
-    return scm;
-  }
-
-  /**
-   * Set chill mode status based on SCMEvents.CHILL_MODE_STATUS event.
-   */
-  @Override
-  public void onMessage(Boolean inChillMOde, EventPublisher publisher) {
-    chillModePrecheck.setInChillMode(inChillMOde);
-  }
-
-  /**
-   * Set chill mode status based on .
-   */
-  public boolean getChillModeStatus() {
-    return chillModePrecheck.isInChillMode();
-  }
-
-
-  /**
-   * Query the System for Nodes.
-   *
-   * @param nodeState - NodeState that we are interested in matching.
-   * @return Set of Datanodes that match the NodeState.
-   */
-  private Set<DatanodeDetails> queryNodeState(HddsProtos.NodeState nodeState) {
-    Set<DatanodeDetails> returnSet = new TreeSet<>();
-    List<DatanodeDetails> tmp = scm.getScmNodeManager().getNodes(nodeState);
-    if ((tmp != null) && (tmp.size() > 0)) {
-      returnSet.addAll(tmp);
-    }
-    return returnSet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
deleted file mode 100644
index d9a0875..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-import com.google.protobuf.GeneratedMessage;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
-
-/**
- * This class is responsible for dispatching heartbeat from datanode to
- * appropriate EventHandler at SCM.
- */
-public final class SCMDatanodeHeartbeatDispatcher {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SCMDatanodeHeartbeatDispatcher.class);
-
-  private final NodeManager nodeManager;
-  private final EventPublisher eventPublisher;
-
-
-  public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager,
-                                        EventPublisher eventPublisher) {
-    Preconditions.checkNotNull(nodeManager);
-    Preconditions.checkNotNull(eventPublisher);
-    this.nodeManager = nodeManager;
-    this.eventPublisher = eventPublisher;
-  }
-
-
-  /**
-   * Dispatches heartbeat to registered event handlers.
-   *
-   * @param heartbeat heartbeat to be dispatched.
-   *
-   * @return list of SCMCommand
-   */
-  public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
-    DatanodeDetails datanodeDetails =
-        DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
-    // should we dispatch heartbeat through eventPublisher?
-    List<SCMCommand> commands = nodeManager.processHeartbeat(datanodeDetails);
-    if (heartbeat.hasNodeReport()) {
-      LOG.debug("Dispatching Node Report.");
-      eventPublisher.fireEvent(NODE_REPORT,
-          new NodeReportFromDatanode(datanodeDetails,
-              heartbeat.getNodeReport()));
-    }
-
-    if (heartbeat.hasContainerReport()) {
-      LOG.debug("Dispatching Container Report.");
-      eventPublisher.fireEvent(CONTAINER_REPORT,
-          new ContainerReportFromDatanode(datanodeDetails,
-              heartbeat.getContainerReport()));
-
-    }
-
-    if (heartbeat.hasContainerActions()) {
-      LOG.debug("Dispatching Container Actions.");
-      eventPublisher.fireEvent(CONTAINER_ACTIONS,
-          new ContainerActionsFromDatanode(datanodeDetails,
-              heartbeat.getContainerActions()));
-    }
-
-    if (heartbeat.hasPipelineReports()) {
-      LOG.debug("Dispatching Pipeline Report.");
-      eventPublisher.fireEvent(PIPELINE_REPORT,
-              new PipelineReportFromDatanode(datanodeDetails,
-                      heartbeat.getPipelineReports()));
-
-    }
-
-    if (heartbeat.hasPipelineActions()) {
-      LOG.debug("Dispatching Pipeline Actions.");
-      eventPublisher.fireEvent(PIPELINE_ACTIONS,
-          new PipelineActionsFromDatanode(datanodeDetails,
-              heartbeat.getPipelineActions()));
-    }
-
-    if (heartbeat.getCommandStatusReportsCount() != 0) {
-      for (CommandStatusReportsProto commandStatusReport : heartbeat
-          .getCommandStatusReportsList()) {
-        eventPublisher.fireEvent(CMD_STATUS_REPORT,
-            new CommandStatusReportFromDatanode(datanodeDetails,
-                commandStatusReport));
-      }
-    }
-
-    return commands;
-  }
-
-  /**
-   * Wrapper class for events with the datanode origin.
-   */
-  public static class ReportFromDatanode<T extends GeneratedMessage> {
-
-    private final DatanodeDetails datanodeDetails;
-
-    private final T report;
-
-    public ReportFromDatanode(DatanodeDetails datanodeDetails, T report) {
-      this.datanodeDetails = datanodeDetails;
-      this.report = report;
-    }
-
-    public DatanodeDetails getDatanodeDetails() {
-      return datanodeDetails;
-    }
-
-    public T getReport() {
-      return report;
-    }
-  }
-
-  /**
-   * Node report event payload with origin.
-   */
-  public static class NodeReportFromDatanode
-      extends ReportFromDatanode<NodeReportProto> {
-
-    public NodeReportFromDatanode(DatanodeDetails datanodeDetails,
-        NodeReportProto report) {
-      super(datanodeDetails, report);
-    }
-  }
-
-  /**
-   * Container report event payload with origin.
-   */
-  public static class ContainerReportFromDatanode
-      extends ReportFromDatanode<ContainerReportsProto> {
-
-    public ContainerReportFromDatanode(DatanodeDetails datanodeDetails,
-        ContainerReportsProto report) {
-      super(datanodeDetails, report);
-    }
-  }
-
-  /**
-   * Container action event payload with origin.
-   */
-  public static class ContainerActionsFromDatanode
-      extends ReportFromDatanode<ContainerActionsProto> {
-
-    public ContainerActionsFromDatanode(DatanodeDetails datanodeDetails,
-                                       ContainerActionsProto actions) {
-      super(datanodeDetails, actions);
-    }
-  }
-
-  /**
-   * Pipeline report event payload with origin.
-   */
-  public static class PipelineReportFromDatanode
-          extends ReportFromDatanode<PipelineReportsProto> {
-
-    public PipelineReportFromDatanode(DatanodeDetails datanodeDetails,
-                                      PipelineReportsProto report) {
-      super(datanodeDetails, report);
-    }
-  }
-
-  /**
-   * Pipeline action event payload with origin.
-   */
-  public static class PipelineActionsFromDatanode
-      extends ReportFromDatanode<PipelineActionsProto> {
-
-    public PipelineActionsFromDatanode(DatanodeDetails datanodeDetails,
-        PipelineActionsProto actions) {
-      super(datanodeDetails, actions);
-    }
-  }
-
-  /**
-   * Container report event payload with origin.
-   */
-  public static class CommandStatusReportFromDatanode
-      extends ReportFromDatanode<CommandStatusReportsProto> {
-
-    public CommandStatusReportFromDatanode(DatanodeDetails datanodeDetails,
-        CommandStatusReportsProto report) {
-      super(datanodeDetails, report);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
deleted file mode 100644
index 9c6fa88..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license
- * agreements. See the NOTICE file distributed with this work for additional
- * information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
- * License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License. You may obtain a
- * copy of the License at
- *
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- *
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.closeContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.deleteBlocksCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
-    .replicateContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.reregisterCommand;
-
-
-
-import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .ReportFromDatanode;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-        .PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolServerSideTranslatorPB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
-import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
-import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-
-/**
- * Protocol Handler for Datanode Protocol.
- */
-public class SCMDatanodeProtocolServer implements
-    StorageContainerDatanodeProtocol {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      SCMDatanodeProtocolServer.class);
-
-  /**
-   * The RPC server that listens to requests from DataNodes.
-   */
-  private final RPC.Server datanodeRpcServer;
-
-  private final StorageContainerManager scm;
-  private final InetSocketAddress datanodeRpcAddress;
-  private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
-  private final EventPublisher eventPublisher;
-
-  public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
-      StorageContainerManager scm, EventPublisher eventPublisher)
-      throws IOException {
-
-    Preconditions.checkNotNull(scm, "SCM cannot be null");
-    Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be 
null");
-
-    this.scm = scm;
-    this.eventPublisher = eventPublisher;
-    final int handlerCount =
-        conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
-            OZONE_SCM_HANDLER_COUNT_DEFAULT);
-
-    heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher(
-        scm.getScmNodeManager(), eventPublisher);
-
-    RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
-        ProtobufRpcEngine.class);
-    BlockingService dnProtoPbService =
-        StorageContainerDatanodeProtocolProtos
-            .StorageContainerDatanodeProtocolService
-            .newReflectiveBlockingService(
-                new StorageContainerDatanodeProtocolServerSideTranslatorPB(
-                    this));
-
-    InetSocketAddress datanodeRpcAddr =
-        HddsServerUtil.getScmDataNodeBindAddress(conf);
-
-    datanodeRpcServer =
-        startRpcServer(
-            conf,
-            datanodeRpcAddr,
-            StorageContainerDatanodeProtocolPB.class,
-            dnProtoPbService,
-            handlerCount);
-
-    datanodeRpcAddress =
-        updateRPCListenAddress(
-            conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr,
-            datanodeRpcServer);
-
-  }
-
-  public void start() {
-    LOG.info(
-        StorageContainerManager.buildRpcServerStartMessage(
-            "RPC server for DataNodes", datanodeRpcAddress));
-    datanodeRpcServer.start();
-  }
-
-  public InetSocketAddress getDatanodeRpcAddress() {
-    return datanodeRpcAddress;
-  }
-
-  @Override
-  public SCMVersionResponseProto getVersion(SCMVersionRequestProto
-      versionRequest)
-      throws IOException {
-    return scm.getScmNodeManager().getVersion(versionRequest)
-        .getProtobufMessage();
-  }
-
-  @Override
-  public SCMRegisteredResponseProto register(
-      HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
-      NodeReportProto nodeReport,
-      ContainerReportsProto containerReportsProto,
-          PipelineReportsProto pipelineReportsProto)
-      throws IOException {
-    DatanodeDetails datanodeDetails = DatanodeDetails
-        .getFromProtoBuf(datanodeDetailsProto);
-    // TODO : Return the list of Nodes that forms the SCM HA.
-    RegisteredCommand registeredCommand = scm.getScmNodeManager()
-        .register(datanodeDetails, nodeReport, pipelineReportsProto);
-    if (registeredCommand.getError()
-        == SCMRegisteredResponseProto.ErrorCode.success) {
-      scm.getScmContainerManager().processContainerReports(datanodeDetails,
-          containerReportsProto, true);
-      eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
-          new NodeRegistrationContainerReport(datanodeDetails,
-              containerReportsProto));
-      eventPublisher.fireEvent(PIPELINE_REPORT,
-              new PipelineReportFromDatanode(datanodeDetails,
-                      pipelineReportsProto));
-    }
-    return getRegisteredResponse(registeredCommand);
-  }
-
-  @VisibleForTesting
-  public static SCMRegisteredResponseProto getRegisteredResponse(
-      RegisteredCommand cmd) {
-    return SCMRegisteredResponseProto.newBuilder()
-        // TODO : Fix this later when we have multiple SCM support.
-        // .setAddressList(addressList)
-        .setErrorCode(cmd.getError())
-        .setClusterID(cmd.getClusterID())
-        .setDatanodeUUID(cmd.getDatanodeUUID())
-        .build();
-  }
-
-  @Override
-  public SCMHeartbeatResponseProto sendHeartbeat(
-      SCMHeartbeatRequestProto heartbeat) throws IOException {
-    List<SCMCommandProto> cmdResponses = new LinkedList<>();
-    for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
-      cmdResponses.add(getCommandResponse(cmd));
-    }
-    return SCMHeartbeatResponseProto.newBuilder()
-        .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
-        .addAllCommands(cmdResponses).build();
-  }
-
-  /**
-   * Returns a SCMCommandRepose from the SCM Command.
-   *
-   * @param cmd - Cmd
-   * @return SCMCommandResponseProto
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public SCMCommandProto getCommandResponse(SCMCommand cmd)
-      throws IOException {
-    SCMCommandProto.Builder builder =
-        SCMCommandProto.newBuilder();
-    switch (cmd.getType()) {
-    case reregisterCommand:
-      return builder
-          .setCommandType(reregisterCommand)
-          .setReregisterCommandProto(ReregisterCommandProto
-              .getDefaultInstance())
-          .build();
-    case deleteBlocksCommand:
-      // Once SCM sends out the deletion message, increment the count.
-      // this is done here instead of when SCM receives the ACK, because
-      // DN might not be able to response the ACK for sometime. In case
-      // it times out, SCM needs to re-send the message some more times.
-      List<Long> txs =
-          ((DeleteBlocksCommand) cmd)
-              .blocksTobeDeleted()
-              .stream()
-              .map(tx -> tx.getTxID())
-              .collect(Collectors.toList());
-      scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
-      return builder
-          .setCommandType(deleteBlocksCommand)
-          .setDeleteBlocksCommandProto(((DeleteBlocksCommand) cmd).getProto())
-          .build();
-    case closeContainerCommand:
-      return builder
-          .setCommandType(closeContainerCommand)
-          .setCloseContainerCommandProto(
-              ((CloseContainerCommand) cmd).getProto())
-          .build();
-    case replicateContainerCommand:
-      return builder
-          .setCommandType(replicateContainerCommand)
-          .setReplicateContainerCommandProto(
-              ((ReplicateContainerCommand)cmd).getProto())
-          .build();
-    default:
-      throw new IllegalArgumentException("Not implemented");
-    }
-  }
-
-
-  public void join() throws InterruptedException {
-    LOG.trace("Join RPC server for DataNodes");
-    datanodeRpcServer.join();
-  }
-
-  public void stop() {
-    try {
-      LOG.info("Stopping the RPC server for DataNodes");
-      datanodeRpcServer.stop();
-    } catch (Exception ex) {
-      LOG.error(" datanodeRpcServer stop failed.", ex);
-    }
-    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
-  }
-
-  /**
-   * Wrapper class for events with the datanode origin.
-   */
-  public static class NodeRegistrationContainerReport extends
-      ReportFromDatanode<ContainerReportsProto> {
-
-    public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails,
-        ContainerReportsProto report) {
-      super(datanodeDetails, report);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
deleted file mode 100644
index 22d4d56..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.server;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.server.ServiceRuntimeInfo;
-
-import java.util.Map;
-
-/**
- *
- * This is the JMX management interface for scm information.
- */
-@InterfaceAudience.Private
-public interface SCMMXBean extends ServiceRuntimeInfo {
-
-  /**
-   * Get the SCM RPC server port that used to listen to datanode requests.
-   * @return SCM datanode RPC server port
-   */
-  String getDatanodeRpcPort();
-
-  /**
-   * Get the SCM RPC server port that used to listen to client requests.
-   * @return SCM client RPC server port
-   */
-  String getClientRpcPort();
-
-  /**
-   * Get container report info that includes container IO stats of nodes.
-   * @return The datanodeUUid to report json string mapping
-   */
-  Map<String, String> getContainerReport();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
deleted file mode 100644
index be6c1af..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.server;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
-import org.apache.hadoop.ozone.common.Storage;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
-import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
-
-/**
- * SCMStorage is responsible for management of the StorageDirectories used by
- * the SCM.
- */
-public class SCMStorage extends Storage {
-
-  /**
-   * Construct SCMStorage.
-   * @throws IOException if any directories are inaccessible.
-   */
-  public SCMStorage(OzoneConfiguration conf) throws IOException {
-    super(NodeType.SCM, getOzoneMetaDirPath(conf), STORAGE_DIR);
-  }
-
-  public void setScmId(String scmId) throws IOException {
-    if (getState() == StorageState.INITIALIZED) {
-      throw new IOException("SCM is already initialized.");
-    } else {
-      getStorageInfo().setProperty(SCM_ID, scmId);
-    }
-  }
-
-  /**
-   * Retrieves the SCM ID from the version file.
-   * @return SCM_ID
-   */
-  public String getScmId() {
-    return getStorageInfo().getProperty(SCM_ID);
-  }
-
-  @Override
-  protected Properties getNodeProperties() {
-    String scmId = getScmId();
-    if (scmId == null) {
-      scmId = UUID.randomUUID().toString();
-    }
-    Properties scmProperties = new Properties();
-    scmProperties.setProperty(SCM_ID, scmId);
-    return scmProperties;
-  }
-
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to