Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 30473ecfb -> 55d4ee71e


HDFS-8210. Ozone: Implement storage container manager. Contributed by Chris 
Nauroth.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/55d4ee71
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/55d4ee71
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/55d4ee71

Branch: refs/heads/HDFS-7240
Commit: 55d4ee71e26867b5b8ea187505fc4c95a44dcfb1
Parents: 30473ec
Author: Anu Engineer <aengin...@apache.org>
Authored: Wed Mar 30 14:49:55 2016 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Wed Mar 30 14:49:55 2016 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   3 +
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   1 +
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  10 +
 .../hadoop/ozone/protocol/LocatedContainer.java | 127 +++++
 .../StorageContainerLocationProtocol.java       |  44 ++
 .../hadoop/ozone/protocol/package-info.java     |  23 +
 ...rLocationProtocolClientSideTranslatorPB.java | 106 ++++
 .../StorageContainerLocationProtocolPB.java     |  34 ++
 ...rLocationProtocolServerSideTranslatorPB.java |  89 ++++
 .../hadoop/ozone/protocolPB/package-info.java   |  24 +
 .../ozone/storage/StorageContainerManager.java  | 509 +++++++++++++++++++
 .../storage/StorageContainerNameService.java    | 131 +++++
 .../hadoop/ozone/storage/package-info.java      |  23 +
 .../StorageContainerLocationProtocol.proto      |  71 +++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |   3 -
 .../apache/hadoop/ozone/MiniOzoneCluster.java   | 168 ++++++
 .../storage/TestStorageContainerManager.java    | 135 +++++
 17 files changed, 1498 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml 
b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index df48e9d..fd6f8e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -18,6 +18,9 @@
        <Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
      </Match>
      <Match>
+       <Package name="org.apache.hadoop.ozone.protocol.proto" />
+     </Match>
+     <Match>
        <Bug pattern="EI_EXPOSE_REP" />
      </Match>
      <Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml 
b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index c6620ed..eb6db3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -353,6 +353,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd";>
                   <include>editlog.proto</include>
                   <include>fsimage.proto</include>
                   <include>DatanodeContainerProtocol.proto</include>
+                  <include>StorageContainerLocationProtocol.proto</include>
                 </includes>
               </source>
               
<output>${project.build.directory}/generated-sources/java</output>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 642b473..27c79c0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -37,6 +37,16 @@ public final class OzoneConfigKeys {
   public static final String DFS_STORAGE_HANDLER_TYPE_KEY =
       "dfs.storage.handler.type";
   public static final String DFS_STORAGE_HANDLER_TYPE_DEFAULT = "distributed";
+  public static final String DFS_STORAGE_RPC_ADDRESS_KEY =
+      "dfs.storage.rpc-address";
+  public static final int DFS_STORAGE_RPC_DEFAULT_PORT = 50200;
+  public static final String DFS_STORAGE_RPC_ADDRESS_DEFAULT =
+      "0.0.0.0:" + DFS_STORAGE_RPC_DEFAULT_PORT;
+  public static final String DFS_STORAGE_RPC_BIND_HOST_KEY =
+      "dfs.storage.rpc-bind-host";
+  public static final String DFS_STORAGE_HANDLER_COUNT_KEY =
+      "dfs.storage.handler.count";
+  public static final int DFS_STORAGE_HANDLER_COUNT_DEFAULT = 10;
   public static final String DFS_OBJECTSTORE_TRACE_ENABLED_KEY =
       "dfs.objectstore.trace.enabled";
   public static final boolean DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/LocatedContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/LocatedContainer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/LocatedContainer.java
new file mode 100644
index 0000000..1915caa
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/LocatedContainer.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * Holds the nodes that currently host the container for an object key hash.
+ */
+@InterfaceAudience.Private
+public final class LocatedContainer {
+  private final String key;
+  private final String matchedKeyPrefix;
+  private final String containerName;
+  private final Set<DatanodeInfo> locations;
+  private final DatanodeInfo leader;
+
+  /**
+   * Creates a LocatedContainer.
+   *
+   * @param key object key
+   * @param matchedKeyPrefix prefix of key that was used to find the location
+   * @param containerName container name
+   * @param locations nodes that currently host the container
+   * @param leader node that currently acts as pipeline leader
+   */
+  public LocatedContainer(String key, String matchedKeyPrefix,
+      String containerName, Set<DatanodeInfo> locations, DatanodeInfo leader) {
+    this.key = key;
+    this.matchedKeyPrefix = matchedKeyPrefix;
+    this.containerName = containerName;
+    this.locations = locations;
+    this.leader = leader;
+  }
+
+  /**
+   * Returns the container name.
+   *
+   * @return container name
+   */
+  public String getContainerName() {
+    return this.containerName;
+  }
+
+  /**
+   * Returns the object key.
+   *
+   * @return object key
+   */
+  public String getKey() {
+    return this.key;
+  }
+
+  /**
+   * Returns the node that currently acts as pipeline leader.
+   *
+   * @return node that currently acts as pipeline leader
+   */
+  public DatanodeInfo getLeader() {
+    return this.leader;
+  }
+
+  /**
+   * Returns the nodes that currently host the container.
+   *
+   * @return Set<DatanodeInfo> nodes that currently host the container
+   */
+  public Set<DatanodeInfo> getLocations() {
+    return this.locations;
+  }
+
+  /**
+   * Returns the prefix of the key that was used to find the location.
+   *
+   * @return prefix of the key that was used to find the location
+   */
+  public String getMatchedKeyPrefix() {
+    return this.matchedKeyPrefix;
+  }
+
+  @Override
+  public boolean equals(Object otherObj) {
+    if (otherObj == null) {
+      return false;
+    }
+    if (!(otherObj instanceof LocatedContainer)) {
+      return false;
+    }
+    LocatedContainer other = (LocatedContainer)otherObj;
+    return this.key == null ? other.key == null : this.key.equals(other.key);
+  }
+
+  @Override
+  public int hashCode() {
+    return key.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName()
+        + "{key=" + key
+        + "; matchedKeyPrefix=" + matchedKeyPrefix
+        + "; containerName=" + containerName
+        + "; locations=" + locations
+        + "; leader=" + leader
+        + "}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java
new file mode 100644
index 0000000..b3605ee
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * ContainerLocationProtocol is used by an HDFS node to find the set of nodes
+ * that currently host a container.
+ */
+@InterfaceAudience.Private
+public interface StorageContainerLocationProtocol {
+
+  /**
+   * Find the set of nodes that currently host the container of an object, as
+   * identified by the object key hash.  This method supports batch lookup by
+   * passing multiple key hashes.
+   *
+   * @param keys batch of object keys to find
+   * @return located containers for each object key
+   * @throws IOException if there is any failure
+   */
+  Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java
new file mode 100644
index 0000000..8bc29aa
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol;
+
+/**
+ * This package contains classes for Ozone protocol definitions.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..6aa8190
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocol.LocatedContainer;
+import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
+
+/**
+ * This class is the client-side translator to translate the requests made on
+ * the {@link StorageContainerLocationProtocol} interface to the RPC server
+ * implementing {@link StorageContainerLocationProtocolPB}.
+ */
+@InterfaceAudience.Private
+public final class StorageContainerLocationProtocolClientSideTranslatorPB
+    implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable 
{
+
+  /** RpcController is not used and hence is set to null. */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+
+  private final StorageContainerLocationProtocolPB rpcProxy;
+
+  /**
+   * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
+   *
+   * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
+   */
+  public StorageContainerLocationProtocolClientSideTranslatorPB(
+      StorageContainerLocationProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  @Override
+  public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
+      throws IOException {
+    GetStorageContainerLocationsRequestProto.Builder req =
+        GetStorageContainerLocationsRequestProto.newBuilder();
+    for (String key: keys) {
+      req.addKeys(key);
+    }
+    final GetStorageContainerLocationsResponseProto resp;
+    try {
+      resp = rpcProxy.getStorageContainerLocations(NULL_RPC_CONTROLLER,
+          req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    Set<LocatedContainer> locatedContainers =
+        
Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedContainersCount());
+    for (LocatedContainerProto locatedContainer:
+        resp.getLocatedContainersList()) {
+      Set<DatanodeInfo> locations = Sets.newLinkedHashSetWithExpectedSize(
+          locatedContainer.getLocationsCount());
+      for (DatanodeInfoProto location: locatedContainer.getLocationsList()) {
+        locations.add(PBHelperClient.convert(location));
+      }
+      locatedContainers.add(new LocatedContainer(locatedContainer.getKey(),
+          locatedContainer.getMatchedKeyPrefix(),
+          locatedContainer.getContainerName(), locations,
+          PBHelperClient.convert(locatedContainer.getLeader())));
+    }
+    return locatedContainers;
+  }
+
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+
+  @Override
+  public void close() {
+    RPC.stopProxy(rpcProxy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolPB.java
new file mode 100644
index 0000000..0587655
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolPB.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService;
+
+/**
+ * Protocol used from an HDFS node to StorageContainerManager.  This extends 
the
+ * Protocol Buffers service interface to add Hadoop-specific annotations.
+ */
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface StorageContainerLocationProtocolPB
+    extends StorageContainerLocationProtocolService.BlockingInterface {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..9d9707f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.ozone.protocol.LocatedContainer;
+import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link StorageContainerLocationProtocolPB} to the
+ * {@link StorageContainerLocationProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public final class StorageContainerLocationProtocolServerSideTranslatorPB
+    implements StorageContainerLocationProtocolPB {
+
+  private final StorageContainerLocationProtocol impl;
+
+  /**
+   * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
+   *
+   * @param impl {@link StorageContainerLocationProtocol} server implementation
+   */
+  public StorageContainerLocationProtocolServerSideTranslatorPB(
+      StorageContainerLocationProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public GetStorageContainerLocationsResponseProto 
getStorageContainerLocations(
+      RpcController unused, GetStorageContainerLocationsRequestProto req)
+      throws ServiceException {
+    Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
+        req.getKeysCount());
+    for (String key: req.getKeysList()) {
+      keys.add(key);
+    }
+    final Set<LocatedContainer> locatedContainers;
+    try {
+      locatedContainers = impl.getStorageContainerLocations(keys);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    GetStorageContainerLocationsResponseProto.Builder resp =
+        GetStorageContainerLocationsResponseProto.newBuilder();
+    for (LocatedContainer locatedContainer: locatedContainers) {
+      LocatedContainerProto.Builder locatedContainerProto =
+          LocatedContainerProto.newBuilder()
+          .setKey(locatedContainer.getKey())
+          .setMatchedKeyPrefix(locatedContainer.getMatchedKeyPrefix())
+          .setContainerName(locatedContainer.getContainerName());
+      for (DatanodeInfo location: locatedContainer.getLocations()) {
+        locatedContainerProto.addLocations(PBHelperClient.convert(location));
+      }
+      locatedContainerProto.setLeader(
+          PBHelperClient.convert(locatedContainer.getLeader()));
+      resp.addLocatedContainers(locatedContainerProto.build());
+    }
+    return resp.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
new file mode 100644
index 0000000..860386d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+/**
+ * This package contains classes for the Protocol Buffers binding of Ozone
+ * protocols.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
new file mode 100644
index 0000000..90e200a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.storage;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.protobuf.BlockingService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
+import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.protocol.LocatedContainer;
+import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
+import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
+import 
org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * StorageContainerManager is the main entry point for the service that 
provides
+ * information about which HDFS nodes host containers.
+ *
+ * The current implementation is a stub suitable to begin end-to-end testing of
+ * Ozone service interactions.  DataNodes report to StorageContainerManager
+ * using the existing heartbeat messages.  StorageContainerManager tells 
clients
+ * container locations by reporting that all registered nodes are a viable
+ * location.  This will evolve from a stub to a full-fledged implementation
+ * capable of partitioning the keyspace across multiple containers, with
+ * appropriate distribution across nodes.
+ */
+@InterfaceAudience.Private
+public class StorageContainerManager
+    implements DatanodeProtocol, StorageContainerLocationProtocol {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StorageContainerManager.class);
+
+  private final StorageContainerNameService ns;
+  private final BlockManager blockManager;
+
+  /** The RPC server that listens to requests from DataNodes. */
+  private final RPC.Server serviceRpcServer;
+  private final InetSocketAddress serviceRpcAddress;
+
+  /** The RPC server that listens to requests from clients. */
+  private final RPC.Server clientRpcServer;
+  private final InetSocketAddress clientRpcAddress;
+
+  /** The RPC server that listens to requests from nodes to find containers. */
+  private final RPC.Server storageRpcServer;
+  private final InetSocketAddress storageRpcAddress;
+
+  /**
+   * Creates a new StorageContainerManager.  Configuration will be updated with
+   * information on the actual listening addresses used for RPC servers.
+   *
+   * @param conf configuration
+   */
+  public StorageContainerManager(Configuration conf)
+      throws IOException {
+    ns = new StorageContainerNameService();
+    boolean haEnabled = false;
+    blockManager = new BlockManager(ns, haEnabled, conf);
+
+    RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    BlockingService dnProtoPbService =
+        DatanodeProtocolProtos
+        .DatanodeProtocolService
+        .newReflectiveBlockingService(
+            new DatanodeProtocolServerSideTranslatorPB(this));
+
+    InetSocketAddress serviceRpcAddr = NameNode.getServiceAddress(conf, false);
+    serviceRpcServer = startRpcServer(conf, serviceRpcAddr,
+        DatanodeProtocolPB.class, dnProtoPbService,
+        DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
+        DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
+        DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
+    serviceRpcAddress = updateListenAddress(conf,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, serviceRpcAddr, 
serviceRpcServer);
+    LOG.info(buildRpcServerStartMessage("Service RPC server",
+        serviceRpcAddress));
+
+    InetSocketAddress rpcAddr = DFSUtilClient.getNNAddress(conf);
+    clientRpcServer = startRpcServer(conf, rpcAddr,
+        DatanodeProtocolPB.class, dnProtoPbService,
+        DFS_NAMENODE_RPC_BIND_HOST_KEY,
+        DFS_NAMENODE_HANDLER_COUNT_KEY,
+        DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
+    clientRpcAddress = updateListenAddress(conf,
+        DFS_NAMENODE_RPC_ADDRESS_KEY, rpcAddr, clientRpcServer);
+    conf.set(FS_DEFAULT_NAME_KEY, DFSUtilClient.getNNUri(clientRpcAddress)
+        .toString());
+    LOG.info(buildRpcServerStartMessage("RPC server", clientRpcAddress));
+
+    BlockingService storageProtoPbService =
+        StorageContainerLocationProtocolProtos
+        .StorageContainerLocationProtocolService
+        .newReflectiveBlockingService(
+            new StorageContainerLocationProtocolServerSideTranslatorPB(this));
+
+    InetSocketAddress storageRpcAddr = NetUtils.createSocketAddr(
+        conf.getTrimmed(DFS_STORAGE_RPC_ADDRESS_KEY,
+            DFS_STORAGE_RPC_ADDRESS_DEFAULT), -1, DFS_STORAGE_RPC_ADDRESS_KEY);
+
+    storageRpcServer = startRpcServer(conf, storageRpcAddr,
+        StorageContainerLocationProtocolPB.class, storageProtoPbService,
+        DFS_STORAGE_RPC_BIND_HOST_KEY,
+        DFS_STORAGE_HANDLER_COUNT_KEY,
+        DFS_STORAGE_HANDLER_COUNT_DEFAULT);
+    storageRpcAddress = updateListenAddress(conf,
+        DFS_STORAGE_RPC_ADDRESS_KEY, storageRpcAddr, storageRpcServer);
+    LOG.info(buildRpcServerStartMessage(
+        "StorageContainerLocationProtocol RPC server", storageRpcAddress));
+  }
+
+  @Override
+  public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
+      throws IOException {
+    LOG.trace("getStorageContainerLocations keys = {}", keys);
+    List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
+    blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
+    if (liveNodes.isEmpty()) {
+      throw new IOException("Storage container locations not found.");
+    }
+    String containerName = UUID.randomUUID().toString();
+    Set<DatanodeInfo> locations =
+        Sets.<DatanodeInfo>newLinkedHashSet(liveNodes);
+    DatanodeInfo leader = liveNodes.get(0);
+    Set<LocatedContainer> locatedContainers =
+        Sets.newLinkedHashSetWithExpectedSize(keys.size());
+    for (String key: keys) {
+      locatedContainers.add(new LocatedContainer(key, key, containerName,
+          locations, leader));
+    }
+    LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}",
+        keys, locatedContainers);
+    return locatedContainers;
+  }
+
+  @Override
+  public DatanodeRegistration registerDatanode(
+      DatanodeRegistration registration) throws IOException {
+    ns.writeLock();
+    try {
+      blockManager.getDatanodeManager().registerDatanode(registration);
+    } finally {
+      ns.writeUnlock();
+    }
+    return registration;
+  }
+
+  @Override
+  public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
+      StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed,
+      int xmitsInProgress, int xceiverCount, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary,
+      boolean requestFullBlockReportLease) throws IOException {
+    ns.readLock();
+    try {
+      long cacheCapacity = 0;
+      long cacheUsed = 0;
+      int maxTransfer = blockManager.getMaxReplicationStreams()
+          - xmitsInProgress;
+      DatanodeCommand[] cmds = blockManager.getDatanodeManager()
+          .handleHeartbeat(registration, reports, 
blockManager.getBlockPoolId(),
+              cacheCapacity, cacheUsed, xceiverCount, maxTransfer,
+              failedVolumes, volumeFailureSummary);
+      long txnId = 234;
+      NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
+          HAServiceProtocol.HAServiceState.ACTIVE, txnId);
+      RollingUpgradeInfo rollingUpgradeInfo = null;
+      long blockReportLeaseId = requestFullBlockReportLease ?
+          blockManager.requestBlockReportLeaseId(registration) : 0;
+      return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
+          blockReportLeaseId);
+    } finally {
+      ns.readUnlock();
+    }
+  }
+
+  @Override
+  public DatanodeCommand blockReport(DatanodeRegistration registration,
+      String poolId, StorageBlockReport[] reports, BlockReportContext context)
+      throws IOException {
+    for (int r = 0; r < reports.length; r++) {
+      final BlockListAsLongs storageContainerList = reports[r].getBlocks();
+      blockManager.processReport(registration, reports[r].getStorage(),
+          storageContainerList, context, r == (reports.length - 1));
+    }
+    return null;
+  }
+
+  @Override
+  public DatanodeCommand cacheReport(DatanodeRegistration registration,
+      String poolId, List<Long> blockIds) throws IOException {
+    // Centralized Cache Management is not supported
+    return null;
+  }
+
+  @Override
+  public void blockReceivedAndDeleted(DatanodeRegistration registration,
+      String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks)
+      throws IOException {
+    for(StorageReceivedDeletedBlocks r : rcvdAndDeletedBlocks) {
+      ns.writeLock();
+      try {
+        blockManager.processIncrementalBlockReport(registration, r);
+      } finally {
+        ns.writeUnlock();
+      }
+    }
+  }
+
+  @Override
+  public void errorReport(DatanodeRegistration registration,
+      int errorCode, String msg) throws IOException {
+    String dnName =
+        (registration == null) ? "Unknown DataNode" : registration.toString();
+    if (errorCode == DatanodeProtocol.NOTIFY) {
+      LOG.info("Error report from " + dnName + ": " + msg);
+      return;
+    }
+    if (errorCode == DatanodeProtocol.DISK_ERROR) {
+      LOG.warn("Disk error on " + dnName + ": " + msg);
+    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
+      LOG.warn("Fatal disk error on " + dnName + ": " + msg);
+      blockManager.getDatanodeManager().removeDatanode(registration);
+    } else {
+      LOG.info("Error report from " + dnName + ": " + msg);
+    }
+  }
+
+  @Override
+  public NamespaceInfo versionRequest() throws IOException {
+    ns.readLock();
+    try {
+      return new NamespaceInfo(1, "random", "random", 2,
+          NodeType.STORAGE_CONTAINER_SERVICE);
+    } finally {
+      ns.readUnlock();
+    }
+  }
+
+  @Override
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    ns.writeLock();
+    try {
+      for (int i = 0; i < blocks.length; i++) {
+        ExtendedBlock blk = blocks[i].getBlock();
+        DatanodeInfo[] nodes = blocks[i].getLocations();
+        String[] storageIDs = blocks[i].getStorageIDs();
+        for (int j = 0; j < nodes.length; j++) {
+          blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j],
+              storageIDs == null ? null: storageIDs[j],
+              "client machine reported it");
+        }
+      }
+    } finally {
+      ns.writeUnlock();
+    }
+  }
+
+  @Override
+  public void commitBlockSynchronization(ExtendedBlock block,
+      long newgenerationstamp, long newlength, boolean closeFile,
+      boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages)
+      throws IOException {
+    // Not needed for the purpose of object store
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns information on registered DataNodes.
+   *
+   * @param type DataNode type to report
+   * @return registered DataNodes matching requested type
+   */
+  public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) {
+    ns.readLock();
+    try {
+      List<DatanodeDescriptor> results =
+          blockManager.getDatanodeManager().getDatanodeListForReport(type);
+      return results.toArray(new DatanodeInfo[results.size()]);
+    } finally {
+      ns.readUnlock();
+    }
+  }
+
+  /**
+   * Returns listen address of StorageContainerLocation RPC server.
+   *
+   * @return listen address of StorageContainerLocation RPC server
+   */
+  @VisibleForTesting
+  public InetSocketAddress getStorageContainerLocationRpcAddress() {
+    return storageRpcAddress;
+  }
+
+  /**
+   * Start service.
+   */
+  public void start() {
+    clientRpcServer.start();
+    if (serviceRpcServer != null) {
+      serviceRpcServer.start();
+    }
+    storageRpcServer.start();
+  }
+
+  /**
+   * Stop service.
+   */
+  public void stop() {
+    if (clientRpcServer != null) {
+      clientRpcServer.stop();
+    }
+    if (serviceRpcServer != null) {
+      serviceRpcServer.stop();
+    }
+    if (storageRpcServer != null) {
+      storageRpcServer.stop();
+    }
+    IOUtils.closeStream(ns);
+  }
+
+  /**
+   * Wait until service has completed shutdown.
+   */
+  public void join() {
+    try {
+      clientRpcServer.join();
+      if (serviceRpcServer != null) {
+        serviceRpcServer.join();
+      }
+      storageRpcServer.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Interrupted during StorageContainerManager join.");
+    }
+  }
+
+  /**
+   * Builds a message for logging startup information about an RPC server.
+   *
+   * @param description RPC server description
+   * @param addr RPC server listening address
+   * @return server startup message
+   */
+  private static String buildRpcServerStartMessage(String description,
+      InetSocketAddress addr) {
+    return addr != null ? String.format("%s is listening at %s",
+        description, NetUtils.getHostPortString(addr)) :
+        String.format("%s not started", description);
+  }
+
+  /**
+   * Starts an RPC server, if configured.
+   *
+   * @param conf configuration
+   * @param addr configured address of RPC server
+   * @param protocol RPC protocol provided by RPC server
+   * @param instance RPC protocol implementation instance
+   * @param bindHostKey configuration key for setting explicit bind host.  If
+   *     the property is not configured, then the bind host is taken from addr.
+   * @param handlerCountKey configuration key for RPC server handler count
+   * @param handlerCountDefault default RPC server handler count if 
unconfigured
+   * @return RPC server, or null if addr is null
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  private static RPC.Server startRpcServer(Configuration conf,
+      InetSocketAddress addr, Class<?> protocol, BlockingService instance,
+      String bindHostKey, String handlerCountKey, int handlerCountDefault)
+      throws IOException {
+    if (addr == null) {
+      return null;
+    }
+    String bindHost = conf.getTrimmed(bindHostKey);
+    if (bindHost == null || bindHost.isEmpty()) {
+      bindHost = addr.getHostName();
+    }
+    int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault);
+    RPC.Server rpcServer = new RPC.Builder(conf)
+        .setProtocol(protocol)
+        .setInstance(instance)
+        .setBindAddress(bindHost)
+        .setPort(addr.getPort())
+        .setNumHandlers(numHandlers)
+        .setVerbose(false)
+        .setSecretManager(null)
+        .build();
+    DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+    return rpcServer;
+  }
+
+  /**
+   * After starting an RPC server, updates configuration with the actual
+   * listening address of that server.  The listening address may be different
+   * from the configured address if, for example, the configured address uses
+   * port 0 to request use of an ephemeral port.
+   *
+   * @param conf configuration to update
+   * @param rpcAddressKey configuration key for RPC server address
+   * @param addr configured address
+   * @param rpcServer started RPC server.  If null, then the server was not
+   *     started, and this method is a no-op.
+   */
+  private static InetSocketAddress updateListenAddress(Configuration conf,
+      String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
+    if (rpcServer == null) {
+      return null;
+    }
+    InetSocketAddress listenAddr = rpcServer.getListenerAddress();
+    InetSocketAddress updatedAddr = new InetSocketAddress(
+        addr.getHostName(), listenAddr.getPort());
+    conf.set(rpcAddressKey, NetUtils.getHostPortString(updatedAddr));
+    return updatedAddr;
+  }
+
+  /**
+   * Main entry point for starting StorageContainerManager.
+   *
+   * @param argv arguments
+   * @throws IOException if startup fails due to I/O error
+   */
+  public static void main(String[] argv) throws IOException {
+    StringUtils.startupShutdownMessage(
+        StorageContainerManager.class, argv, LOG);
+    StorageContainerManager scm = new StorageContainerManager(
+        new Configuration());
+    scm.start();
+    scm.join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java
new file mode 100644
index 0000000..ca9d0eb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.storage;
+
+import java.io.Closeable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
+
+/**
+ * Namesystem implementation intended for use by StorageContainerManager.
+ */
+@InterfaceAudience.Private
+public class StorageContainerNameService implements Namesystem, Closeable {
+
+  private final ReentrantReadWriteLock coarseLock =
+      new ReentrantReadWriteLock();
+  private volatile boolean serviceRunning = true;
+
+  @Override
+  public boolean isRunning() {
+    return serviceRunning;
+  }
+
+  @Override
+  public BlockCollection getBlockCollection(long id) {
+    // TBD
+    return null;
+  }
+
+  @Override
+  public void startSecretManagerIfNecessary() {
+    // Secret manager is not supported
+  }
+
+  @Override
+  public CacheManager getCacheManager() {
+    // Centralized Cache Management is not supported
+    return null;
+  }
+
+  @Override
+  public HAContext getHAContext() {
+    // HA mode is not supported
+    return null;
+  }
+
+  @Override
+  public boolean inTransitionToActive() {
+    // HA mode is not supported
+    return false;
+  }
+
+  @Override
+  public boolean isInSnapshot(long blockCollectionID) {
+    // Snapshots not supported
+    return false;
+  }
+
+  @Override
+  public void readLock() {
+    coarseLock.readLock().lock();
+  }
+
+  @Override
+  public void readUnlock() {
+    coarseLock.readLock().unlock();
+  }
+
+  @Override
+  public boolean hasReadLock() {
+    return coarseLock.getReadHoldCount() > 0 || hasWriteLock();
+  }
+
+  @Override
+  public void writeLock() {
+    coarseLock.writeLock().lock();
+  }
+
+  @Override
+  public void writeLockInterruptibly() throws InterruptedException {
+    coarseLock.writeLock().lockInterruptibly();
+  }
+
+  @Override
+  public void writeUnlock() {
+    coarseLock.writeLock().unlock();
+  }
+
+  @Override
+  public boolean hasWriteLock() {
+    return coarseLock.isWriteLockedByCurrentThread();
+  }
+
+  @Override
+  public boolean isInSafeMode() {
+    // Safe mode is not supported
+    return false;
+  }
+
+  @Override
+  public boolean isInStartupSafeMode() {
+    // Safe mode is not supported
+    return false;
+  }
+
+  @Override
+  public void close() {
+    serviceRunning = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java
new file mode 100644
index 0000000..75e337f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.storage;
+
+/**
+ * This package contains StorageContainerManager classes.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto
new file mode 100644
index 0000000..6ed326a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.ozone.protocol.proto";
+option java_outer_classname = "StorageContainerLocationProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+import "hdfs.proto";
+
+/**
+ * keys - batch of object keys to find
+ */
+message GetStorageContainerLocationsRequestProto {
+  repeated string keys = 1;
+}
+
+/**
+ * locatedContainers - for each requested hash, nodes that currently host the
+ *     container for that object key hash
+ */
+message GetStorageContainerLocationsResponseProto {
+  repeated LocatedContainerProto locatedContainers = 1;
+}
+
+/**
+ * Holds the nodes that currently host the container for an object key.
+ */
+message LocatedContainerProto {
+  required string key = 1;
+  required string matchedKeyPrefix = 2;
+  required string containerName = 3;
+  repeated DatanodeInfoProto locations = 4;
+  required DatanodeInfoProto leader = 5;
+}
+
+/**
+ * Protocol used from an HDFS node to StorageContainerManager.  See the request
+ * and response messages for details of the RPC calls.
+ */
+service StorageContainerLocationProtocolService {
+  /**
+   * Find the set of nodes that currently host the container of an object, as
+   * identified by the object key hash.  This method supports batch lookup by
+   * passing multiple key hashes.
+   */
+  rpc getStorageContainerLocations(GetStorageContainerLocationsRequestProto)
+      returns(GetStorageContainerLocationsResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 2eff12e..e6c5176 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1034,9 +1034,6 @@ public class MiniDFSCluster {
    */
   public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean 
federation,
       Configuration conf) throws IOException {
-    Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
-        "empty NN topology: no namenodes specified!");
-
     if (!federation && nnTopology.countNameNodes() == 1) {
       NNConf onlyNN = nnTopology.getOnlyNameNode();
       // we only had one NN, set DEFAULT_NAME for it. If not explicitly

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
new file mode 100644
index 0000000..218058c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
+import 
org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.storage.StorageContainerManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+
+/**
+ * MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
+ * running tests.  The cluster consists of a StorageContainerManager and
+ * multiple DataNodes.  This class subclasses {@link MiniDFSCluster} for
+ * convenient reuse of logic for starting DataNodes.  Unlike MiniDFSCluster, it
+ * does not start a NameNode, because Ozone does not require a NameNode.
+ */
+@InterfaceAudience.Private
+public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MiniOzoneCluster.class);
+
+  private final OzoneConfiguration conf;
+  private final StorageContainerManager scm;
+
+  /**
+   * Creates a new MiniOzoneCluster.
+   *
+   * @param builder cluster builder
+   * @param scm StorageContainerManager, already running
+   * @throws IOException if there is an I/O error
+   */
+  private MiniOzoneCluster(Builder builder, StorageContainerManager scm)
+        throws IOException {
+    super(builder);
+    this.conf = builder.conf;
+    this.scm = scm;
+  }
+
+  /**
+   * Builder for configuring the MiniOzoneCluster to run.
+   */
+  public static class Builder
+      extends org.apache.hadoop.hdfs.MiniDFSCluster.Builder {
+
+    private final OzoneConfiguration conf;
+
+    /**
+     * Creates a new Builder.
+     *
+     * @param conf configuration
+     */
+    public Builder(OzoneConfiguration conf) {
+      super(conf);
+      this.conf = conf;
+      this.nnTopology(new MiniDFSNNTopology()); // No NameNode required
+    }
+
+    @Override
+    public Builder numDataNodes(int val) {
+      super.numDataNodes(val);
+      return this;
+    }
+
+    @Override
+    public MiniOzoneCluster build() throws IOException {
+      // Even though this won't start a NameNode, some of the logic in
+      // MiniDFSCluster expects to find the default file system configured with
+      // an HDFS URI.
+      conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:0");
+      conf.set(DFS_STORAGE_RPC_ADDRESS_KEY, "127.0.0.1:0");
+      StorageContainerManager scm = new StorageContainerManager(conf);
+      scm.start();
+      return new MiniOzoneCluster(this, scm);
+    }
+  }
+
+  @Override
+  public void close() {
+    shutdown();
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    LOG.info("Shutting down the Mini Ozone Cluster");
+    if (scm == null) {
+      return;
+    }
+    LOG.info("Shutting down the StorageContainerManager");
+    scm.stop();
+    scm.join();
+  }
+
+  /**
+   * Waits for the Ozone cluster to be ready for processing requests.
+   */
+  public void waitOzoneReady() {
+    long begin = Time.monotonicNow();
+    while (scm.getDatanodeReport(DatanodeReportType.LIVE).length <
+        numDataNodes) {
+      if (Time.monotonicNow() - begin > 20000) {
+        throw new IllegalStateException(
+            "Timed out waiting for Ozone cluster to become ready.");
+      }
+      LOG.info("Waiting for Ozone cluster to become ready");
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(
+            "Interrupted while waiting for Ozone cluster to become ready.");
+      }
+    }
+  }
+
+  /**
+   * Creates an RPC proxy connected to this cluster's StorageContainerManager
+   * for accessing container location information.  Callers take ownership of
+   * the proxy and must close it when done.
+   *
+   * @return RPC proxy for accessing container location information
+   * @throws IOException if there is an I/O error
+   */
+  protected StorageContainerLocationProtocolClientSideTranslatorPB
+      createStorageContainerLocationClient() throws IOException {
+    long version = RPC.getProtocolVersion(
+        StorageContainerLocationProtocolPB.class);
+    InetSocketAddress address = scm.getStorageContainerLocationRpcAddress();
+    return new StorageContainerLocationProtocolClientSideTranslatorPB(
+        RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
+        address, UserGroupInformation.getCurrentUser(), conf,
+        NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55d4ee71/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/storage/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/storage/TestStorageContainerManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/storage/TestStorageContainerManager.java
new file mode 100644
index 0000000..4ea1d6f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/storage/TestStorageContainerManager.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.protocol.LocatedContainer;
+import 
org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+
+public class TestStorageContainerManager {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void init() throws IOException {
+    conf = new OzoneConfiguration();
+    conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
+    conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "distributed");
+    conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true);
+  }
+
+  @After
+  public void shutdown() throws InterruptedException {
+    IOUtils.cleanup(null, storageContainerLocationClient, cluster);
+  }
+
+  @Test
+  public void testLocationsForSingleKey() throws IOException {
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitOzoneReady();
+    storageContainerLocationClient =
+        cluster.createStorageContainerLocationClient();
+    Set<LocatedContainer> containers =
+        storageContainerLocationClient.getStorageContainerLocations(
+            new LinkedHashSet<>(Arrays.asList("/key1")));
+    assertNotNull(containers);
+    assertEquals(1, containers.size());
+    assertLocatedContainer(containers, "/key1", 1);
+  }
+
+  @Test
+  public void testLocationsForMultipleKeys() throws IOException {
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitOzoneReady();
+    storageContainerLocationClient =
+        cluster.createStorageContainerLocationClient();
+    Set<LocatedContainer> containers =
+        storageContainerLocationClient.getStorageContainerLocations(
+            new LinkedHashSet<>(Arrays.asList("/key1", "/key2", "/key3")));
+    assertNotNull(containers);
+    assertEquals(3, containers.size());
+    assertLocatedContainer(containers, "/key1", 1);
+    assertLocatedContainer(containers, "/key2", 1);
+    assertLocatedContainer(containers, "/key3", 1);
+  }
+
+  @Test
+  public void testNoDataNodes() throws IOException {
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(0).build();
+    cluster.waitOzoneReady();
+    storageContainerLocationClient =
+        cluster.createStorageContainerLocationClient();
+    exception.expect(IOException.class);
+    exception.expectMessage("locations not found");
+    storageContainerLocationClient.getStorageContainerLocations(
+        new LinkedHashSet<>(Arrays.asList("/key1")));
+  }
+
+  @Test
+  public void testMultipleDataNodes() throws IOException {
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitOzoneReady();
+    storageContainerLocationClient =
+        cluster.createStorageContainerLocationClient();
+    Set<LocatedContainer> containers =
+        storageContainerLocationClient.getStorageContainerLocations(
+            new LinkedHashSet<>(Arrays.asList("/key1")));
+    assertNotNull(containers);
+    assertEquals(1, containers.size());
+    assertLocatedContainer(containers, "/key1", 3);
+  }
+
+  private static void assertLocatedContainer(Set<LocatedContainer> containers,
+      String key, int expectedNumLocations) {
+    LocatedContainer container = null;
+    for (LocatedContainer curContainer: containers) {
+      if (key.equals(curContainer.getKey())) {
+        container = curContainer;
+        break;
+      }
+    }
+    assertNotNull("Container for key " + key + " not found.", container);
+    assertEquals(key, container.getKey());
+    assertNotNull(container.getMatchedKeyPrefix());
+    assertFalse(container.getMatchedKeyPrefix().isEmpty());
+    assertNotNull(container.getContainerName());
+    assertFalse(container.getContainerName().isEmpty());
+    assertNotNull(container.getLocations());
+    assertEquals(expectedNumLocations, container.getLocations().size());
+    assertNotNull(container.getLeader());
+  }
+}

Reply via email to