This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch HDDS-2665-ofs
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-2665-ofs by this push:
     new 20bb5b5  HDDS-2840. Implement ofs://: mkdir (#415)
20bb5b5 is described below

commit 20bb5b56d6b15ecd47126559374c3391b3661d99
Author: Siyao Meng <[email protected]>
AuthorDate: Thu Jan 30 12:45:06 2020 -0800

    HDDS-2840. Implement ofs://: mkdir (#415)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../dist/src/main/compose/ozone/docker-config      |   2 +
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 483 ++++++++++++
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   | 715 ++++++++++++++++++
 .../fs/ozone/BasicRootedOzoneFileSystem.java       | 809 +++++++++++++++++++++
 .../java/org/apache/hadoop/fs/ozone/OFSPath.java   | 145 ++++
 .../hadoop/fs/ozone/OzoneClientAdapterFactory.java |  27 +
 .../hadoop/fs/ozone/RootedOzoneClientAdapter.java  |  33 +
 .../fs/ozone/RootedOzoneClientAdapterImpl.java     |  59 ++
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     | 103 +++
 .../org/apache/hadoop/fs/ozone/TestOFSPath.java    | 115 +++
 .../ozone/TestRootedOzoneFileSystemWithMocks.java  | 116 +++
 12 files changed, 2608 insertions(+)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 4a83dff..ee2ecf8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -84,6 +84,7 @@ public final class OzoneConsts {
 
   // Ozone File System scheme
   public static final String OZONE_URI_SCHEME = "o3fs";
+  public static final String OZONE_OFS_URI_SCHEME = "ofs";
 
   public static final String OZONE_RPC_SCHEME = "o3";
   public static final String OZONE_HTTP_SCHEME = "http";
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index 7249157..364a32b 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+CORE-SITE.XML_fs.ofs.impl=org.apache.hadoop.fs.ozone.RootedOzoneFileSystem
+CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
 OZONE-SITE.XML_ozone.om.address=om
 OZONE-SITE.XML_ozone.om.http-address=om:9874
 OZONE-SITE.XML_ozone.scm.names=scm
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
new file mode 100644
index 0000000..97d739b
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientException;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Ozone file system tests that are not covered by contract tests.
+ *
+ * TODO: Refactor this and TestOzoneFileSystem to eliminate most
+ *  code duplication.
+ */
+public class TestRootedOzoneFileSystem {
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(300_000);
+
+  private static MiniOzoneCluster cluster = null;
+
+  private static FileSystem fs;
+  private static RootedOzoneFileSystem ofs;
+
+  private static ObjectStore objectStore;
+
+  private String volumeName;
+  private String bucketName;
+
+  private String rootPath;
+
+  // Store path commonly used by tests that test functionality within a bucket
+  private String testBucketStr;
+  private Path testBucketPath;
+
+  @Before
+  public void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+    objectStore = cluster.getClient().getObjectStore();
+
+    // create a volume and a bucket to be used by RootedOzoneFileSystem (OFS)
+    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
+    volumeName = bucket.getVolumeName();
+    bucketName = bucket.getName();
+    testBucketStr = "/" + volumeName + "/" + bucketName;
+    testBucketPath = new Path(testBucketStr);
+
+    rootPath = String.format("%s://%s/", OzoneConsts.OZONE_OFS_URI_SCHEME,
+        conf.get(OZONE_OM_ADDRESS_KEY));
+
+    // Set the fs.defaultFS and start the filesystem
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Note: FileSystem#loadFileSystems won't load OFS class due to META-INF
+    //  hence this workaround.
+    conf.set("fs.ofs.impl", 
"org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
+    fs = FileSystem.get(conf);
+    ofs = (RootedOzoneFileSystem) fs;
+  }
+
+  @After
+  public void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.closeQuietly(fs);
+  }
+
+  @Test
+  public void testOzoneFsServiceLoader() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    // Note: FileSystem#loadFileSystems won't load OFS class due to META-INF
+    //  hence this workaround.
+    conf.set("fs.ofs.impl", 
"org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
+    assertEquals(
+        FileSystem.getFileSystemClass(OzoneConsts.OZONE_OFS_URI_SCHEME, conf),
+        RootedOzoneFileSystem.class);
+  }
+
+  @Test
+  public void testCreateDoesNotAddParentDirKeys() throws Exception {
+    Path grandparent = new Path(testBucketPath,
+        "testCreateDoesNotAddParentDirKeys");
+    Path parent = new Path(grandparent, "parent");
+    Path child = new Path(parent, "child");
+    ContractTestUtils.touch(fs, child);
+
+    OzoneKeyDetails key = getKey(child, false);
+    OFSPath childOFSPath = new OFSPath(child);
+    assertEquals(key.getName(), childOFSPath.getKeyName());
+
+    // Creating a child should not add parent keys to the bucket
+    try {
+      getKey(parent, true);
+    } catch (IOException ex) {
+      assertKeyNotFoundException(ex);
+    }
+
+    // List status on the parent should show the child file
+    assertEquals("List status of parent should include the 1 child file", 1L,
+        (long)fs.listStatus(parent).length);
+    assertTrue("Parent directory does not appear to be a directory",
+        fs.getFileStatus(parent).isDirectory());
+  }
+
+  @Test
+  public void testDeleteCreatesFakeParentDir() throws Exception {
+    Path grandparent = new Path(testBucketPath,
+        "testDeleteCreatesFakeParentDir");
+    Path parent = new Path(grandparent, "parent");
+    Path child = new Path(parent, "child");
+    ContractTestUtils.touch(fs, child);
+
+    // Verify that parent dir key does not exist
+    // Creating a child should not add parent keys to the bucket
+    try {
+      getKey(parent, true);
+    } catch (IOException ex) {
+      assertKeyNotFoundException(ex);
+    }
+
+    // Delete the child key
+    assertTrue(fs.delete(child, false));
+
+    // Deleting the only child should create the parent dir key if it does
+    // not exist
+    OFSPath parentOFSPath = new OFSPath(parent);
+    String parentKey = parentOFSPath.getKeyName() + "/";
+    OzoneKeyDetails parentKeyInfo = getKey(parent, true);
+    assertEquals(parentKey, parentKeyInfo.getName());
+
+    // Recursive delete with DeleteIterator
+    assertTrue(fs.delete(grandparent, true));
+  }
+
+  @Test
+  public void testListStatus() throws Exception {
+    Path parent = new Path(testBucketPath, "testListStatus");
+    Path file1 = new Path(parent, "key1");
+    Path file2 = new Path(parent, "key2");
+
+    FileStatus[] fileStatuses = ofs.listStatus(testBucketPath);
+    assertEquals("Should be empty", 0, fileStatuses.length);
+
+    ContractTestUtils.touch(fs, file1);
+    ContractTestUtils.touch(fs, file2);
+
+    fileStatuses = ofs.listStatus(testBucketPath);
+    assertEquals("Should have created parent",
+        1, fileStatuses.length);
+    assertEquals("Parent path doesn't match",
+        fileStatuses[0].getPath().toUri().getPath(), parent.toString());
+
+    // ListStatus on a directory should return all subdirs along with
+    // files, even if there exists a file and sub-dir with the same name.
+    fileStatuses = ofs.listStatus(parent);
+    assertEquals("FileStatus did not return all children of the directory",
+        2, fileStatuses.length);
+
+    // ListStatus should return only the immediate children of a directory.
+    Path file3 = new Path(parent, "dir1/key3");
+    Path file4 = new Path(parent, "dir1/key4");
+    ContractTestUtils.touch(fs, file3);
+    ContractTestUtils.touch(fs, file4);
+    fileStatuses = ofs.listStatus(parent);
+    assertEquals("FileStatus did not return all children of the directory",
+        3, fileStatuses.length);
+  }
+
+  /**
+   * OFS: Helper function for tests. Return a volume name that doesn't exist.
+   */
+  private String getRandomNonExistVolumeName() throws Exception {
+    final int numDigit = 5;
+    long retriesLeft = Math.round(Math.pow(10, 5));
+    String name = null;
+    while (name == null && retriesLeft-- > 0) {
+      name = "volume-" + RandomStringUtils.randomNumeric(numDigit);
+      // Check volume existence.
+      Iterator<? extends OzoneVolume> iter =
+          objectStore.listVolumesByUser(null, name, null);
+      if (iter.hasNext()) {
+        // If there is a match, try again.
+        // Note that volume name prefix match doesn't equal volume existence
+        //  but the check is sufficient for this test.
+        name = null;
+      }
+    }
+    if (retriesLeft <= 0) {
+      throw new Exception(
+          "Failed to generate random volume name that doesn't exist already.");
+    }
+    return name;
+  }
+
+  /**
+   * OFS: Test mkdir on volume, bucket and dir that doesn't exist.
+   */
+  @Test
+  public void testMkdirOnNonExistentVolumeBucketDir() throws Exception {
+    String volumeNameLocal = getRandomNonExistVolumeName();
+    String bucketNameLocal = "bucket-" + RandomStringUtils.randomNumeric(5);
+    Path root = new Path("/" + volumeNameLocal + "/" + bucketNameLocal);
+    Path dir1 = new Path(root, "dir1");
+    Path dir12 = new Path(dir1, "dir12");
+    Path dir2 = new Path(root, "dir2");
+    fs.mkdirs(dir12);
+    fs.mkdirs(dir2);
+
+    // Check volume and bucket existence, they should both be created.
+    OzoneVolume ozoneVolume = objectStore.getVolume(volumeNameLocal);
+    OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketNameLocal);
+    OFSPath ofsPathDir1 = new OFSPath(dir12);
+    String key = ofsPathDir1.getKeyName() + "/";
+    OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(key);
+    assertEquals(key, ozoneKeyDetails.getName());
+
+    // Verify that directories are created.
+    FileStatus[] fileStatuses = ofs.listStatus(root);
+    assertEquals(fileStatuses[0].getPath().toUri().getPath(), dir1.toString());
+    assertEquals(fileStatuses[1].getPath().toUri().getPath(), dir2.toString());
+
+    fileStatuses = ofs.listStatus(dir1);
+    assertEquals(fileStatuses[0].getPath().toUri().getPath(), 
dir12.toString());
+    fileStatuses = ofs.listStatus(dir12);
+    assertEquals(fileStatuses.length, 0);
+    fileStatuses = ofs.listStatus(dir2);
+    assertEquals(fileStatuses.length, 0);
+  }
+
+  /**
+   * OFS: Tests mkdir on a volume and bucket that doesn't exist.
+   */
+  @Test
+  public void testMkdirNonExistentVolumeBucket() throws Exception {
+    String volumeNameLocal = getRandomNonExistVolumeName();
+    String bucketNameLocal = "bucket-" + RandomStringUtils.randomNumeric(5);
+    Path newVolBucket = new Path(
+        "/" + volumeNameLocal + "/" + bucketNameLocal);
+    fs.mkdirs(newVolBucket);
+
+    // Verify with listVolumes and listBuckets
+    Iterator<? extends OzoneVolume> iterVol =
+        objectStore.listVolumesByUser(null, volumeNameLocal, null);
+    OzoneVolume ozoneVolume = iterVol.next();
+    assertNotNull(ozoneVolume);
+    assertEquals(volumeNameLocal, ozoneVolume.getName());
+
+    Iterator<? extends OzoneBucket> iterBuc =
+        ozoneVolume.listBuckets("bucket-");
+    OzoneBucket ozoneBucket = iterBuc.next();
+    assertNotNull(ozoneBucket);
+    assertEquals(bucketNameLocal, ozoneBucket.getName());
+
+    // TODO: Use listStatus to check volume and bucket creation in HDDS-2928.
+  }
+
+  /**
+   * OFS: Tests mkdir on a volume that doesn't exist.
+   */
+  @Test
+  public void testMkdirNonExistentVolume() throws Exception {
+    String volumeNameLocal = getRandomNonExistVolumeName();
+    Path newVolume = new Path("/" + volumeNameLocal);
+    fs.mkdirs(newVolume);
+
+    // Verify with listVolumes and listBuckets
+    Iterator<? extends OzoneVolume> iterVol =
+        objectStore.listVolumesByUser(null, volumeNameLocal, null);
+    OzoneVolume ozoneVolume = iterVol.next();
+    assertNotNull(ozoneVolume);
+    assertEquals(volumeNameLocal, ozoneVolume.getName());
+
+    // TODO: Use listStatus to check volume and bucket creation in HDDS-2928.
+  }
+
+  /**
+   * Tests listStatus operation in a bucket.
+   */
+  @Test
+  public void testListStatusOnRoot() throws Exception {
+    Path root = new Path("/" + volumeName + "/" + bucketName);
+    Path dir1 = new Path(root, "dir1");
+    Path dir12 = new Path(dir1, "dir12");
+    Path dir2 = new Path(root, "dir2");
+    fs.mkdirs(dir12);
+    fs.mkdirs(dir2);
+
+    // ListStatus on root should return dir1 (even though /dir1 key does not
+    // exist) and dir2 only. dir12 is not an immediate child of root and
+    // hence should not be listed.
+    FileStatus[] fileStatuses = ofs.listStatus(root);
+    assertEquals("FileStatus should return only the immediate children", 2,
+        fileStatuses.length);
+
+    // Verify that dir12 is not included in the result of the listStatus on 
root
+    String fileStatus1 = fileStatuses[0].getPath().toUri().getPath();
+    String fileStatus2 = fileStatuses[1].getPath().toUri().getPath();
+    assertFalse(fileStatus1.equals(dir12.toString()));
+    assertFalse(fileStatus2.equals(dir12.toString()));
+  }
+
+  /**
+   * Tests listStatus operation on root directory.
+   */
+  @Test
+  public void testListStatusOnLargeDirectory() throws Exception {
+    Path root = new Path("/" + volumeName + "/" + bucketName);
+    Set<String> paths = new TreeSet<>();
+    int numDirs = LISTING_PAGE_SIZE + LISTING_PAGE_SIZE / 2;
+    for(int i = 0; i < numDirs; i++) {
+      Path p = new Path(root, String.valueOf(i));
+      fs.mkdirs(p);
+      paths.add(p.getName());
+    }
+
+    FileStatus[] fileStatuses = ofs.listStatus(root);
+    assertEquals(
+        "Total directories listed do not match the existing directories",
+        numDirs, fileStatuses.length);
+
+    for (int i=0; i < numDirs; i++) {
+      assertTrue(paths.contains(fileStatuses[i].getPath().getName()));
+    }
+  }
+
+  /**
+   * Tests listStatus on a path with subdirs.
+   */
+  @Test
+  public void testListStatusOnSubDirs() throws Exception {
+    // Create the following key structure
+    //      /dir1/dir11/dir111
+    //      /dir1/dir12
+    //      /dir1/dir12/file121
+    //      /dir2
+    // ListStatus on /dir1 should return all its immediated subdirs only
+    // which are /dir1/dir11 and /dir1/dir12. Super child files/dirs
+    // (/dir1/dir12/file121 and /dir1/dir11/dir111) should not be returned by
+    // listStatus.
+    Path dir1 = new Path(testBucketPath, "dir1");
+    Path dir11 = new Path(dir1, "dir11");
+    Path dir111 = new Path(dir11, "dir111");
+    Path dir12 = new Path(dir1, "dir12");
+    Path file121 = new Path(dir12, "file121");
+    Path dir2 = new Path(testBucketPath, "dir2");
+    fs.mkdirs(dir111);
+    fs.mkdirs(dir12);
+    ContractTestUtils.touch(fs, file121);
+    fs.mkdirs(dir2);
+
+    FileStatus[] fileStatuses = ofs.listStatus(dir1);
+    assertEquals("FileStatus should return only the immediate children", 2,
+        fileStatuses.length);
+
+    // Verify that the two children of /dir1 returned by listStatus operation
+    // are /dir1/dir11 and /dir1/dir12.
+    String fileStatus1 = fileStatuses[0].getPath().toUri().getPath();
+    String fileStatus2 = fileStatuses[1].getPath().toUri().getPath();
+    assertTrue(fileStatus1.equals(dir11.toString()) ||
+        fileStatus1.equals(dir12.toString()));
+    assertTrue(fileStatus2.equals(dir11.toString()) ||
+        fileStatus2.equals(dir12.toString()));
+  }
+
+  @Test
+  public void testNonExplicitlyCreatedPathExistsAfterItsLeafsWereRemoved()
+      throws Exception {
+    Path source = new Path(testBucketPath, "source");
+    Path interimPath = new Path(source, "interimPath");
+    Path leafInsideInterimPath = new Path(interimPath, "leaf");
+    Path target = new Path(testBucketPath, "target");
+    Path leafInTarget = new Path(target, "leaf");
+
+    fs.mkdirs(source);
+    fs.mkdirs(target);
+    fs.mkdirs(leafInsideInterimPath);
+
+    assertTrue(fs.rename(leafInsideInterimPath, leafInTarget));
+
+    // after rename listStatus for interimPath should succeed and
+    // interimPath should have no children
+    FileStatus[] statuses = fs.listStatus(interimPath);
+    assertNotNull("liststatus returns a null array", statuses);
+    assertEquals("Statuses array is not empty", 0, statuses.length);
+    FileStatus fileStatus = fs.getFileStatus(interimPath);
+    assertEquals("FileStatus does not point to interimPath",
+        interimPath.getName(), fileStatus.getPath().getName());
+  }
+
+  /**
+   * OFS: Try to rename a key to a different bucket. The attempt should fail.
+   */
+  @Test
+  public void testRenameToDifferentBucket() throws IOException {
+    Path source = new Path(testBucketPath, "source");
+    Path interimPath = new Path(source, "interimPath");
+    Path leafInsideInterimPath = new Path(interimPath, "leaf");
+    Path target = new Path(testBucketPath, "target");
+
+    fs.mkdirs(source);
+    fs.mkdirs(target);
+    fs.mkdirs(leafInsideInterimPath);
+
+    // Attempt to rename the key to a different bucket
+    Path bucket2 = new Path("/" + volumeName + "/" + bucketName + "test");
+    Path leafInTargetInAnotherBucket = new Path(bucket2, "leaf");
+    try {
+      fs.rename(leafInsideInterimPath, leafInTargetInAnotherBucket);
+      fail("Should have thrown exception when renaming to a different bucket");
+    } catch (IOException ignored) {
+      // Test passed. Exception thrown as expected.
+    }
+  }
+
+  private OzoneKeyDetails getKey(Path keyPath, boolean isDirectory)
+      throws IOException, OzoneClientException {
+    String key = ofs.pathToKey(keyPath);
+    if (isDirectory) {
+      key = key + "/";
+    }
+    OFSPath ofsPath = new OFSPath(key);
+    String keyInBucket = ofsPath.getKeyName();
+    return cluster.getClient().getObjectStore().getVolume(volumeName)
+        .getBucket(bucketName).getKey(keyInBucket);
+  }
+
+  private void assertKeyNotFoundException(IOException ex) {
+    GenericTestUtils.assertExceptionContains("KEY_NOT_FOUND", ex);
+  }
+
+}
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
new file mode 100644
index 0000000..9a3f844
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -0,0 +1,715 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes
+    .BUCKET_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes
+    .VOLUME_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes
+    .VOLUME_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes
+    .BUCKET_NOT_FOUND;
+
+/**
+ * Basic Implementation of the OzoneFileSystem calls.
+ * <p>
+ * This is the minimal version which doesn't include any statistics.
+ * <p>
+ * For full featured version use OzoneClientAdapterImpl.
+ */
+public class BasicRootedOzoneClientAdapterImpl
+    implements RootedOzoneClientAdapter {
+
+  static final Logger LOG =
+      LoggerFactory.getLogger(BasicRootedOzoneClientAdapterImpl.class);
+
+  private OzoneClient ozoneClient;
+  private ClientProtocol proxy;
+  private ObjectStore objectStore;
+  private ReplicationType replicationType;
+  private ReplicationFactor replicationFactor;
+  private boolean securityEnabled;
+  private int configuredDnPort;
+
+  /**
+   * Create new OzoneClientAdapter implementation.
+   *
+   * @throws IOException In case of a problem.
+   */
+  public BasicRootedOzoneClientAdapterImpl() throws IOException {
+    this(createConf());
+  }
+
+  private static OzoneConfiguration createConf() {
+    ClassLoader contextClassLoader =
+        Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(null);
+    try {
+      return new OzoneConfiguration();
+    } finally {
+      Thread.currentThread().setContextClassLoader(contextClassLoader);
+    }
+  }
+
+  public BasicRootedOzoneClientAdapterImpl(OzoneConfiguration conf)
+      throws IOException {
+    this(null, -1, conf);
+  }
+
+  public BasicRootedOzoneClientAdapterImpl(String omHost, int omPort,
+      Configuration hadoopConf) throws IOException {
+
+    ClassLoader contextClassLoader =
+        Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(null);
+
+    try {
+      OzoneConfiguration conf = OzoneConfiguration.of(hadoopConf);
+
+      if (omHost == null && OmUtils.isServiceIdsDefined(conf)) {
+        // When the host name or service id isn't given
+        // but ozone.om.service.ids is defined, declare failure.
+
+        // This is a safety precaution that prevents the client from
+        // accidentally failing over to an unintended OM.
+        throw new IllegalArgumentException("Service ID or host name must not"
+            + " be omitted when ozone.om.service.ids is defined.");
+      }
+
+      if (omPort != -1) {
+        // When the port number is specified, perform the following check
+        if (OmUtils.isOmHAServiceId(conf, omHost)) {
+          // If omHost is a service id, it shouldn't use a port
+          throw new IllegalArgumentException("Port " + omPort +
+              " specified in URI but host '" + omHost + "' is a "
+              + "logical (HA) OzoneManager and does not use port 
information.");
+        }
+      } else {
+        // When port number is not specified, read it from config
+        omPort = OmUtils.getOmRpcPort(conf);
+      }
+
+      SecurityConfig secConfig = new SecurityConfig(conf);
+
+      if (secConfig.isSecurityEnabled()) {
+        this.securityEnabled = true;
+      }
+
+      String replicationTypeConf =
+          conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+              OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT);
+
+      int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
+          OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
+
+      if (OmUtils.isOmHAServiceId(conf, omHost)) {
+        // omHost is listed as one of the service ids in the config,
+        // thus we should treat omHost as omServiceId
+        this.ozoneClient =
+            OzoneClientFactory.getRpcClient(omHost, conf);
+      } else if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
+        this.ozoneClient =
+            OzoneClientFactory.getRpcClient(omHost, omPort, conf);
+      } else {
+        this.ozoneClient =
+            OzoneClientFactory.getRpcClient(conf);
+      }
+      objectStore = ozoneClient.getObjectStore();
+      proxy = objectStore.getClientProxy();
+      this.replicationType = ReplicationType.valueOf(replicationTypeConf);
+      this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
+      this.configuredDnPort = conf.getInt(
+          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    } finally {
+      Thread.currentThread().setContextClassLoader(contextClassLoader);
+    }
+  }
+
+  OzoneBucket getBucket(OFSPath ofsPath, boolean createIfNotExist)
+      throws IOException {
+
+    return getBucket(ofsPath.getVolumeName(), ofsPath.getBucketName(),
+        createIfNotExist);
+  }
+
+  /**
+   * Get OzoneBucket object to operate in.
+   * Optionally create volume and bucket if not found.
+   *
+   * @param createIfNotExist Set this to true if the caller is a write 
operation
+   *                         in order to create the volume and bucket.
+   * @throws IOException Exceptions other than OMException with result code
+   *                     VOLUME_NOT_FOUND or BUCKET_NOT_FOUND.
+   */
+  private OzoneBucket getBucket(String volumeStr, String bucketStr,
+      boolean createIfNotExist) throws IOException {
+
+    OzoneBucket bucket;
+    try {
+      bucket = proxy.getBucketDetails(volumeStr, bucketStr);
+    } catch (OMException ex) {
+      if (createIfNotExist) {
+        // Note: getBucketDetails always throws BUCKET_NOT_FOUND, even if
+        // the volume doesn't exist.
+        if (ex.getResult().equals(BUCKET_NOT_FOUND)) {
+          OzoneVolume volume;
+          try {
+            volume = proxy.getVolumeDetails(volumeStr);
+          } catch (OMException getVolEx) {
+            if (getVolEx.getResult().equals(VOLUME_NOT_FOUND)) {
+              // Volume doesn't exist. Create it
+              try {
+                objectStore.createVolume(volumeStr);
+              } catch (OMException newVolEx) {
+                // Ignore the case where another client created the volume
+                if (!newVolEx.getResult().equals(VOLUME_ALREADY_EXISTS)) {
+                  throw newVolEx;
+                }
+              }
+            } else {
+              throw getVolEx;
+            }
+            // Try get volume again
+            volume = proxy.getVolumeDetails(volumeStr);
+          }
+          // Create the bucket
+          try {
+            volume.createBucket(bucketStr);
+          } catch (OMException newBucEx) {
+            // Ignore the case where another client created the bucket
+            if (!newBucEx.getResult().equals(BUCKET_ALREADY_EXISTS)) {
+              throw newBucEx;
+            }
+          }
+        }
+        // Try get bucket again
+        bucket = proxy.getBucketDetails(volumeStr, bucketStr);
+      } else {
+        throw ex;
+      }
+    }
+
+    return bucket;
+  }
+
+  @Override
+  public void close() throws IOException {
+    ozoneClient.close();
+  }
+
+  @Override
+  public InputStream readFile(String pathStr) throws IOException {
+    incrementCounter(Statistic.OBJECTS_READ);
+    OFSPath ofsPath = new OFSPath(pathStr);
+    String key = ofsPath.getKeyName();
+    try {
+      OzoneBucket bucket = getBucket(ofsPath, false);
+      return bucket.readFile(key).getInputStream();
+    } catch (OMException ex) {
+      if (ex.getResult() == OMException.ResultCodes.FILE_NOT_FOUND
+          || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+        throw new FileNotFoundException(
+            ex.getResult().name() + ": " + ex.getMessage());
+      } else {
+        throw ex;
+      }
+    }
+  }
+
+  protected void incrementCounter(Statistic objectsRead) {
+    //noop: Use OzoneClientAdapterImpl which supports statistics.
+  }
+
+  @Override
+  public OzoneFSOutputStream createFile(String pathStr, boolean overWrite,
+      boolean recursive) throws IOException {
+    incrementCounter(Statistic.OBJECTS_CREATED);
+    OFSPath ofsPath = new OFSPath(pathStr);
+    String key = ofsPath.getKeyName();
+    try {
+      OzoneBucket bucket = getBucket(ofsPath, false);
+      OzoneOutputStream ozoneOutputStream = bucket.createFile(
+          key, 0, replicationType, replicationFactor, overWrite, recursive);
+      return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
+    } catch (OMException ex) {
+      if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+          || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+        throw new FileAlreadyExistsException(
+            ex.getResult().name() + ": " + ex.getMessage());
+      } else {
+        throw ex;
+      }
+    }
+  }
+
+  @Override
+  public void renameKey(String key, String newKeyName) throws IOException {
+    throw new IOException("OFS doesn't support renameKey, use rename 
instead.");
+  }
+
+  @Override
+  public void rename(String path, String newPath) throws IOException {
+    incrementCounter(Statistic.OBJECTS_RENAMED);
+    OFSPath ofsPath = new OFSPath(path);
+    OFSPath ofsNewPath = new OFSPath(newPath);
+
+    // Check path and newPathName are in the same volume and same bucket.
+    // This should have been checked in BasicRootedOzoneFileSystem#rename
+    // already via regular call path unless bypassed.
+    if (!ofsPath.isInSameBucketAs(ofsNewPath)) {
+      throw new IOException("Cannot rename a key to a different bucket");
+    }
+
+    OzoneBucket bucket = getBucket(ofsPath, false);
+    String key = ofsPath.getKeyName();
+    String newKey = ofsNewPath.getKeyName();
+    bucket.renameKey(key, newKey);
+  }
+
+  /**
+   * Package-private helper function to reduce calls to getBucket().
+   * @param bucket Bucket to operate in.
+   * @param path Existing key path.
+   * @param newPath New key path.
+   * @throws IOException IOException from bucket.renameKey().
+   */
+  void rename(OzoneBucket bucket, String path, String newPath)
+      throws IOException {
+    incrementCounter(Statistic.OBJECTS_RENAMED);
+    OFSPath ofsPath = new OFSPath(path);
+    OFSPath ofsNewPath = new OFSPath(newPath);
+    // No same-bucket policy check here since this call path is controlled
+    String key = ofsPath.getKeyName();
+    String newKey = ofsNewPath.getKeyName();
+    bucket.renameKey(key, newKey);
+  }
+
+  /**
+   * Helper method to create an directory specified by key name in bucket.
+   *
+   * @param pathStr path to be created as directory
+   * @return true if the key is created, false otherwise
+   */
+  @Override
+  public boolean createDirectory(String pathStr) throws IOException {
+    LOG.trace("creating dir for path: {}", pathStr);
+    incrementCounter(Statistic.OBJECTS_CREATED);
+    OFSPath ofsPath = new OFSPath(pathStr);
+    if (ofsPath.getVolumeName().length() == 0) {
+      // Volume name unspecified, invalid param, return failure
+      return false;
+    }
+    if (ofsPath.getBucketName().length() == 0) {
+      // Create volume only
+      objectStore.createVolume(ofsPath.getVolumeName());
+      return true;
+    }
+    String keyStr = ofsPath.getKeyName();
+    try {
+      OzoneBucket bucket = getBucket(ofsPath, true);
+      // Empty keyStr here indicates only volume and bucket is
+      // given in pathStr, so getBucket above should handle the creation
+      // of volume and bucket. We won't feed empty keyStr to
+      // bucket.createDirectory as that would be a NPE.
+      if (keyStr != null && keyStr.length() > 0) {
+        bucket.createDirectory(keyStr);
+      }
+    } catch (OMException e) {
+      if (e.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS) {
+        throw new FileAlreadyExistsException(e.getMessage());
+      }
+      throw e;
+    }
+    return true;
+  }
+
+  /**
+   * Helper method to delete an object specified by key name in bucket.
+   *
+   * @param path path to a key to be deleted
+   * @return true if the key is deleted, false otherwise
+   */
+  @Override
+  public boolean deleteObject(String path) {
+    LOG.trace("issuing delete for path to key: {}", path);
+    incrementCounter(Statistic.OBJECTS_DELETED);
+    OFSPath ofsPath = new OFSPath(path);
+    String keyName = ofsPath.getKeyName();
+    if (keyName.length() == 0) {
+      return false;
+    }
+    try {
+      OzoneBucket bucket = getBucket(ofsPath, false);
+      bucket.deleteKey(keyName);
+      return true;
+    } catch (IOException ioe) {
+      LOG.error("delete key failed " + ioe.getMessage());
+      return false;
+    }
+  }
+
+  /**
+   * Package-private helper function to reduce calls to getBucket().
+   * @param bucket Bucket to operate in.
+   * @param path Path to delete.
+   * @return true if operation succeeded, false upon IOException.
+   */
+  boolean deleteObject(OzoneBucket bucket, String path) {
+    LOG.trace("issuing delete for path to key: {}", path);
+    incrementCounter(Statistic.OBJECTS_DELETED);
+    OFSPath ofsPath = new OFSPath(path);
+    String keyName = ofsPath.getKeyName();
+    if (keyName.length() == 0) {
+      return false;
+    }
+    try {
+      bucket.deleteKey(keyName);
+      return true;
+    } catch (IOException ioe) {
+      LOG.error("delete key failed " + ioe.getMessage());
+      return false;
+    }
+  }
+
+  public FileStatusAdapter getFileStatus(String path, URI uri,
+      Path qualifiedPath, String userName)
+      throws IOException {
+    incrementCounter(Statistic.OBJECTS_QUERY);
+    OFSPath ofsPath = new OFSPath(path);
+    String key = ofsPath.getKeyName();
+    try {
+      OzoneBucket bucket = getBucket(ofsPath, false);
+      OzoneFileStatus status = bucket.getFileStatus(key);
+      // Note: qualifiedPath passed in is good from
+      //  BasicRootedOzoneFileSystem#getFileStatus. No need to prepend here.
+      makeQualified(status, uri, qualifiedPath, userName);
+      return toFileStatusAdapter(status);
+
+    } catch (OMException e) {
+      if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
+        throw new
+            FileNotFoundException(key + ": No such file or directory!");
+      }
+      throw e;
+    }
+  }
+
+  public void makeQualified(FileStatus status, URI uri, Path path,
+      String username) {
+    if (status instanceof OzoneFileStatus) {
+      ((OzoneFileStatus) status)
+          .makeQualified(uri, path,
+              username, username);
+    }
+  }
+
+  @Override
+  public Iterator<BasicKeyInfo> listKeys(String pathStr) {
+    incrementCounter(Statistic.OBJECTS_LIST);
+    OFSPath ofsPath = new OFSPath(pathStr);
+    String key = ofsPath.getKeyName();
+    OzoneBucket bucket;
+    try {
+      bucket = getBucket(ofsPath, false);
+    } catch (IOException ex) {
+      // return an empty list on error
+      return new IteratorAdapter(Collections.emptyIterator());
+    }
+    return new IteratorAdapter(bucket.listKeys(key));
+  }
+
+  /**
+   * OFS listStatus implementation.
+   *
+   * @param pathStr Path for the listStatus to operate on.
+   *                This takes an absolute path from OFS root.
+   * @param recursive Set to true to get keys inside subdirectories.
+   * @param startPath Start path of next batch of result for continuation.
+   *                  This takes an absolute path from OFS root. e.g.
+   *                  /volumeA/bucketB/dirC/fileD
+   * @param numEntries Number of maximum entries in the batch.
+   * @param uri URI of OFS root.
+   *            Used in making the return path qualified.
+   * @param workingDir Working directory.
+   *                   Used in making the return path qualified.
+   * @param username User name.
+   *                 Used in making the return path qualified.
+   * @return A list of FileStatusAdapter.
+   * @throws IOException Bucket exception or FileNotFoundException.
+   */
+  public List<FileStatusAdapter> listStatus(String pathStr, boolean recursive,
+      String startPath, long numEntries, URI uri,
+      Path workingDir, String username) throws IOException {
+
+    incrementCounter(Statistic.OBJECTS_LIST);
+    OFSPath ofsPath = new OFSPath(pathStr);
+    // TODO: Subject to change in HDDS-2928.
+    String keyName = ofsPath.getKeyName();
+    OFSPath ofsStartPath = new OFSPath(startPath);
+    // Internally we need startKey to be passed into bucket.listStatus
+    String startKey = ofsStartPath.getKeyName();
+    try {
+      OzoneBucket bucket = getBucket(ofsPath, false);
+      List<OzoneFileStatus> statuses = bucket
+          .listStatus(keyName, recursive, startKey, numEntries);
+      // Note: result in statuses above doesn't have volume/bucket path since
+      //  they are from the server.
+      String ofsPathPrefix = ofsPath.getNonKeyPath();
+
+      List<FileStatusAdapter> result = new ArrayList<>();
+      for (OzoneFileStatus status : statuses) {
+        // Get raw path (without volume and bucket name) and remove leading '/'
+        String rawPath = status.getPath().toString().substring(1);
+        Path appendedPath = new Path(ofsPathPrefix, rawPath);
+        Path qualifiedPath = appendedPath.makeQualified(uri, workingDir);
+        makeQualified(status, uri, qualifiedPath, username);
+        result.add(toFileStatusAdapter(status));
+      }
+      return result;
+    } catch (OMException e) {
+      if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
+        throw new FileNotFoundException(e.getMessage());
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
+      throws IOException {
+    if (!securityEnabled) {
+      return null;
+    }
+    Token<OzoneTokenIdentifier> token = ozoneClient.getObjectStore()
+        .getDelegationToken(renewer == null ? null : new Text(renewer));
+    token.setKind(OzoneTokenIdentifier.KIND_NAME);
+    return token;
+
+  }
+
+  @Override
+  public KeyProvider getKeyProvider() throws IOException {
+    return objectStore.getKeyProvider();
+  }
+
+  @Override
+  public URI getKeyProviderUri() throws IOException {
+    return objectStore.getKeyProviderUri();
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    return objectStore.getCanonicalServiceName();
+  }
+
+  /**
+   * Ozone Delegation Token Renewer.
+   */
+  @InterfaceAudience.Private
+  public static class Renewer extends TokenRenewer {
+
+    //Ensure that OzoneConfiguration files are loaded before trying to use
+    // the renewer.
+    static {
+      OzoneConfiguration.activate();
+    }
+
+    public Text getKind() {
+      return OzoneTokenIdentifier.KIND_NAME;
+    }
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return getKind().equals(kind);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    @Override
+    public long renew(Token<?> token, Configuration conf)
+        throws IOException, InterruptedException {
+      Token<OzoneTokenIdentifier> ozoneDt =
+          (Token<OzoneTokenIdentifier>) token;
+      OzoneClient ozoneClient =
+          OzoneClientFactory.getRpcClient(conf);
+      return ozoneClient.getObjectStore().renewDelegationToken(ozoneDt);
+    }
+
+    @Override
+    public void cancel(Token<?> token, Configuration conf)
+        throws IOException, InterruptedException {
+      Token<OzoneTokenIdentifier> ozoneDt =
+          (Token<OzoneTokenIdentifier>) token;
+      OzoneClient ozoneClient =
+          OzoneClientFactory.getRpcClient(conf);
+      ozoneClient.getObjectStore().cancelDelegationToken(ozoneDt);
+    }
+  }
+
+  /**
+   * Adapter to convert OzoneKey to a safe and simple Key implementation.
+   */
+  public static class IteratorAdapter implements Iterator<BasicKeyInfo> {
+
+    private Iterator<? extends OzoneKey> original;
+
+    public IteratorAdapter(Iterator<? extends OzoneKey> listKeys) {
+      this.original = listKeys;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return original.hasNext();
+    }
+
+    @Override
+    public BasicKeyInfo next() {
+      OzoneKey next = original.next();
+      if (next == null) {
+        return null;
+      } else {
+        return new BasicKeyInfo(
+            next.getName(),
+            next.getModificationTime().toEpochMilli(),
+            next.getDataSize()
+        );
+      }
+    }
+  }
+
+  private FileStatusAdapter toFileStatusAdapter(OzoneFileStatus status) {
+    return new FileStatusAdapter(
+        status.getLen(),
+        status.getPath(),
+        status.isDirectory(),
+        status.getReplication(),
+        status.getBlockSize(),
+        status.getModificationTime(),
+        status.getAccessTime(),
+        status.getPermission().toShort(),
+        status.getOwner(),
+        status.getGroup(),
+        status.getPath(),
+        getBlockLocations(status)
+    );
+  }
+
+  /**
+   * Helper method to get List of BlockLocation from OM Key info.
+   * @param fileStatus Ozone key file status.
+   * @return list of block locations.
+   */
+  private BlockLocation[] getBlockLocations(OzoneFileStatus fileStatus) {
+
+    if (fileStatus == null) {
+      return new BlockLocation[0];
+    }
+
+    OmKeyInfo keyInfo = fileStatus.getKeyInfo();
+    if (keyInfo == null || CollectionUtils.isEmpty(
+        keyInfo.getKeyLocationVersions())) {
+      return new BlockLocation[0];
+    }
+    List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups =
+        keyInfo.getKeyLocationVersions();
+    if (CollectionUtils.isEmpty(omKeyLocationInfoGroups)) {
+      return new BlockLocation[0];
+    }
+
+    OmKeyLocationInfoGroup omKeyLocationInfoGroup =
+        keyInfo.getLatestVersionLocations();
+    BlockLocation[] blockLocations = new BlockLocation[
+        omKeyLocationInfoGroup.getBlocksLatestVersionOnly().size()];
+
+    int i = 0;
+    for (OmKeyLocationInfo omKeyLocationInfo :
+        omKeyLocationInfoGroup.getBlocksLatestVersionOnly()) {
+      List<String> hostList = new ArrayList<>();
+      List<String> nameList = new ArrayList<>();
+      omKeyLocationInfo.getPipeline().getNodes()
+          .forEach(dn -> {
+            hostList.add(dn.getHostName());
+            int port = dn.getPort(
+                DatanodeDetails.Port.Name.STANDALONE).getValue();
+            if (port == 0) {
+              port = configuredDnPort;
+            }
+            nameList.add(dn.getHostName() + ":" + port);
+          });
+
+      String[] hosts = hostList.toArray(new String[hostList.size()]);
+      String[] names = nameList.toArray(new String[nameList.size()]);
+      BlockLocation blockLocation = new BlockLocation(
+          names, hosts, omKeyLocationInfo.getOffset(),
+          omKeyLocationInfo.getLength());
+      blockLocations[i++] = blockLocation;
+    }
+    return blockLocations;
+  }
+}
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
new file mode 100644
index 0000000..869bd63
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -0,0 +1,809 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+
+/**
+ * The minimal Ozone Filesystem implementation.
+ * <p>
+ * This is a basic version which doesn't extend
+ * KeyProviderTokenIssuer and doesn't include statistics. It can be used
+ * from older hadoop version. For newer hadoop version use the full featured
+ * BasicRootedOzoneFileSystem.
+ */
[email protected]
[email protected]
+public class BasicRootedOzoneFileSystem extends FileSystem {
+  static final Logger LOG =
+      LoggerFactory.getLogger(BasicRootedOzoneFileSystem.class);
+
+  /**
+   * The Ozone client for connecting to Ozone server.
+   */
+
+  private URI uri;
+  private String userName;
+  private Path workingDir;
+  private OzoneClientAdapter adapter;
+
+  private static final String URI_EXCEPTION_TEXT =
+      "URL should be one of the following formats: " +
+      "ofs://om-service-id/path/to/key  OR " +
+      "ofs://om-host.example.com/path/to/key  OR " +
+      "ofs://om-host.example.com:5678/path/to/key";
+
+  @Override
+  public void initialize(URI name, Configuration conf) throws IOException {
+    super.initialize(name, conf);
+    setConf(conf);
+    Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
+    Preconditions.checkArgument(getScheme().equals(name.getScheme()),
+        "Invalid scheme provided in " + name);
+
+    String authority = name.getAuthority();
+    if (authority == null) {
+      // authority is null when fs.defaultFS is not a qualified ofs URI and
+      // ofs:/// is passed to the client. matcher will NPE if authority is null
+      throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
+    }
+
+    String omHostOrServiceId;
+    int omPort = -1;
+    // Parse hostname and port
+    String[] parts = authority.split(":");
+    if (parts.length > 2) {
+      throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
+    }
+    omHostOrServiceId = parts[0];
+    if (parts.length == 2) {
+      try {
+        omPort = Integer.parseInt(parts[1]);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
+      }
+    }
+
+    try {
+      uri = new URIBuilder().setScheme(OZONE_OFS_URI_SCHEME)
+          .setHost(authority)
+          .build();
+      LOG.trace("Ozone URI for OFS initialization is " + uri);
+
+      //isolated is the default for ozonefs-lib-legacy which includes the
+      // /ozonefs.txt, otherwise the default is false. It could be overridden.
+      boolean defaultValue =
+          BasicRootedOzoneFileSystem.class.getClassLoader()
+              .getResource("ozonefs.txt") != null;
+
+      //Use string here instead of the constant as constant may not be 
available
+      //on the classpath of a hadoop 2.7
+      boolean isolatedClassloader =
+          conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
+
+      // adapter should be initialized in operations.
+      this.adapter = createAdapter(
+          conf, omHostOrServiceId, omPort, isolatedClassloader);
+
+      try {
+        this.userName =
+            UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException e) {
+        this.userName = OZONE_DEFAULT_USER;
+      }
+      this.workingDir = new Path(OZONE_USER_DIR, this.userName)
+          .makeQualified(this.uri, this.workingDir);
+    } catch (URISyntaxException ue) {
+      final String msg = "Invalid Ozone endpoint " + name;
+      LOG.error(msg, ue);
+      throw new IOException(msg, ue);
+    }
+  }
+
+  protected OzoneClientAdapter createAdapter(Configuration conf,
+      String omHost, int omPort, boolean isolatedClassloader)
+      throws IOException {
+
+    if (isolatedClassloader) {
+      return OzoneClientAdapterFactory.createAdapter();
+    } else {
+      return new BasicRootedOzoneClientAdapterImpl(omHost, omPort, conf);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      adapter.close();
+    } finally {
+      super.close();
+    }
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public String getScheme() {
+    return OZONE_OFS_URI_SCHEME;
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    incrementCounter(Statistic.INVOCATION_OPEN);
+    statistics.incrementReadOps(1);
+    LOG.trace("open() path: {}", path);
+    final String key = pathToKey(path);
+    return new FSDataInputStream(
+        new OzoneFSInputStream(adapter.readFile(key), statistics));
+  }
+
+  protected void incrementCounter(Statistic statistic) {
+    //don't do anything in this default implementation.
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize,
+      short replication, long blockSize,
+      Progressable progress) throws IOException {
+    LOG.trace("create() path:{}", f);
+    incrementCounter(Statistic.INVOCATION_CREATE);
+    statistics.incrementWriteOps(1);
+    final String key = pathToKey(f);
+    return createOutputStream(key, overwrite, true);
+  }
+
+  @Override
+  public FSDataOutputStream createNonRecursive(Path path,
+      FsPermission permission,
+      EnumSet<CreateFlag> flags,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress) throws IOException {
+    incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE);
+    statistics.incrementWriteOps(1);
+    final String key = pathToKey(path);
+    return createOutputStream(key, flags.contains(CreateFlag.OVERWRITE), 
false);
+  }
+
+  private FSDataOutputStream createOutputStream(String key, boolean overwrite,
+      boolean recursive) throws IOException {
+    return new FSDataOutputStream(adapter.createFile(key, overwrite, 
recursive),
+        statistics);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("append() Not implemented by the "
+        + getClass().getSimpleName() + " FileSystem implementation");
+  }
+
+  private class RenameIterator extends OzoneListingIterator {
+    private final String srcPath;
+    private final String dstPath;
+    private final OzoneBucket bucket;
+    private final BasicRootedOzoneClientAdapterImpl adapterImpl;
+
+    RenameIterator(Path srcPath, Path dstPath)
+        throws IOException {
+      super(srcPath);
+      this.srcPath = pathToKey(srcPath);
+      this.dstPath = pathToKey(dstPath);
+      LOG.trace("rename from:{} to:{}", this.srcPath, this.dstPath);
+      // Initialize bucket here to reduce number of RPC calls
+      OFSPath ofsPath = new OFSPath(srcPath);
+      // TODO: Refactor later.
+      adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter;
+      this.bucket = adapterImpl.getBucket(ofsPath, false);
+    }
+
+    @Override
+    boolean processKeyPath(String keyPath) throws IOException {
+      String newPath = dstPath.concat(keyPath.substring(srcPath.length()));
+      adapterImpl.rename(this.bucket, keyPath, newPath);
+      return true;
+    }
+  }
+
+  /**
+   * Check whether the source and destination path are valid and then perform
+   * rename from source path to destination path.
+   * <p>
+   * The rename operation is performed by renaming the keys with src as prefix.
+   * For such keys the prefix is changed from src to dst.
+   *
+   * @param src source path for rename
+   * @param dst destination path for rename
+   * @return true if rename operation succeeded or
+   * if the src and dst have the same path and are of the same type
+   * @throws IOException on I/O errors or if the src/dst paths are invalid.
+   */
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    incrementCounter(Statistic.INVOCATION_RENAME);
+    statistics.incrementWriteOps(1);
+    if (src.equals(dst)) {
+      return true;
+    }
+
+    LOG.trace("rename() from: {} to: {}", src, dst);
+    if (src.isRoot()) {
+      // Cannot rename root of file system
+      LOG.trace("Cannot rename the root of a filesystem");
+      return false;
+    }
+
+    // src and dst should be in the same bucket
+    OFSPath ofsSrc = new OFSPath(src);
+    OFSPath ofsDst = new OFSPath(dst);
+    if (!ofsSrc.isInSameBucketAs(ofsDst)) {
+      throw new IOException("Cannot rename a key to a different bucket");
+    }
+
+    // Cannot rename a directory to its own subdirectory
+    Path dstParent = dst.getParent();
+    while (dstParent != null && !src.equals(dstParent)) {
+      dstParent = dstParent.getParent();
+    }
+    Preconditions.checkArgument(dstParent == null,
+        "Cannot rename a directory to its own subdirectory");
+    // Check if the source exists
+    FileStatus srcStatus;
+    try {
+      srcStatus = getFileStatus(src);
+    } catch (FileNotFoundException fnfe) {
+      // source doesn't exist, return
+      return false;
+    }
+
+    // Check if the destination exists
+    FileStatus dstStatus;
+    try {
+      dstStatus = getFileStatus(dst);
+    } catch (FileNotFoundException fnde) {
+      dstStatus = null;
+    }
+
+    if (dstStatus == null) {
+      // If dst doesn't exist, check whether dst parent dir exists or not
+      // if the parent exists, the source can still be renamed to dst path
+      dstStatus = getFileStatus(dst.getParent());
+      if (!dstStatus.isDirectory()) {
+        throw new IOException(String.format(
+            "Failed to rename %s to %s, %s is a file", src, dst,
+            dst.getParent()));
+      }
+    } else {
+      // if dst exists and source and destination are same,
+      // check both the src and dst are of same type
+      if (srcStatus.getPath().equals(dstStatus.getPath())) {
+        return !srcStatus.isDirectory();
+      } else if (dstStatus.isDirectory()) {
+        // If dst is a directory, rename source as subpath of it.
+        // for example rename /source to /dst will lead to /dst/source
+        dst = new Path(dst, src.getName());
+        FileStatus[] statuses;
+        try {
+          statuses = listStatus(dst);
+        } catch (FileNotFoundException fnde) {
+          statuses = null;
+        }
+
+        if (statuses != null && statuses.length > 0) {
+          // If dst exists and not a directory not empty
+          throw new FileAlreadyExistsException(String.format(
+              "Failed to rename %s to %s, file already exists or not empty!",
+              src, dst));
+        }
+      } else {
+        // If dst is not a directory
+        throw new FileAlreadyExistsException(String.format(
+            "Failed to rename %s to %s, file already exists!", src, dst));
+      }
+    }
+
+    if (srcStatus.isDirectory()) {
+      if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
+        LOG.trace("Cannot rename a directory to a subdirectory of self");
+        return false;
+      }
+    }
+    RenameIterator iterator = new RenameIterator(src, dst);
+    boolean result = iterator.iterate();
+    if (result) {
+      createFakeParentDirectory(src);
+    }
+    return result;
+  }
+
+  private class DeleteIterator extends OzoneListingIterator {
+    private boolean recursive;
+    private final OzoneBucket bucket;
+    private final BasicRootedOzoneClientAdapterImpl adapterImpl;
+
+    DeleteIterator(Path f, boolean recursive)
+        throws IOException {
+      super(f);
+      this.recursive = recursive;
+      if (getStatus().isDirectory()
+          && !this.recursive
+          && listStatus(f).length != 0) {
+        throw new PathIsNotEmptyDirectoryException(f.toString());
+      }
+      // Initialize bucket here to reduce number of RPC calls
+      OFSPath ofsPath = new OFSPath(f);
+      // TODO: Refactor later.
+      adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter;
+      this.bucket = adapterImpl.getBucket(ofsPath, false);
+    }
+
+    @Override
+    boolean processKeyPath(String keyPath) throws IOException {
+      if (keyPath.equals("")) {
+        LOG.trace("Skipping deleting root directory");
+        return true;
+      } else {
+        LOG.trace("deleting key path:" + keyPath);
+        boolean succeed = adapterImpl.deleteObject(this.bucket, keyPath);
+        // if recursive delete is requested ignore the return value of
+        // deleteObject and issue deletes for other keys.
+        return recursive || succeed;
+      }
+    }
+  }
+
+  /**
+   * Deletes the children of the input dir path by iterating though the
+   * DeleteIterator.
+   *
+   * @param f directory path to be deleted
+   * @return true if successfully deletes all required keys, false otherwise
+   * @throws IOException
+   */
+  private boolean innerDelete(Path f, boolean recursive) throws IOException {
+    LOG.trace("delete() path:{} recursive:{}", f, recursive);
+    try {
+      DeleteIterator iterator = new DeleteIterator(f, recursive);
+      return iterator.iterate();
+    } catch (FileNotFoundException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Couldn't delete {} - does not exist", f);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    incrementCounter(Statistic.INVOCATION_DELETE);
+    statistics.incrementWriteOps(1);
+    LOG.debug("Delete path {} - recursive {}", f, recursive);
+    FileStatus status;
+    try {
+      status = getFileStatus(f);
+    } catch (FileNotFoundException ex) {
+      LOG.warn("delete: Path does not exist: {}", f);
+      return false;
+    }
+
+    String key = pathToKey(f);
+    boolean result;
+
+    if (status.isDirectory()) {
+      LOG.debug("delete: Path is a directory: {}", f);
+      key = addTrailingSlashIfNeeded(key);
+
+      if (key.equals("/")) {
+        LOG.warn("Cannot delete root directory.");
+        return false;
+      }
+
+      result = innerDelete(f, recursive);
+    } else {
+      LOG.debug("delete: Path is a file: {}", f);
+      result = adapter.deleteObject(key);
+    }
+
+    if (result) {
+      // If this delete operation removes all files/directories from the
+      // parent direcotry, then an empty parent directory must be created.
+      createFakeParentDirectory(f);
+    }
+
+    return result;
+  }
+
+  /**
+   * Create a fake parent directory key if it does not already exist and no
+   * other child of this parent directory exists.
+   *
+   * @param f path to the fake parent directory
+   * @throws IOException
+   */
+  private void createFakeParentDirectory(Path f) throws IOException {
+    Path parent = f.getParent();
+    if (parent != null && !parent.isRoot()) {
+      createFakeDirectoryIfNecessary(parent);
+    }
+  }
+
+  /**
+   * Create a fake directory key if it does not already exist.
+   *
+   * @param f path to the fake directory
+   * @throws IOException
+   */
+  private void createFakeDirectoryIfNecessary(Path f) throws IOException {
+    String key = pathToKey(f);
+    if (!key.isEmpty() && !o3Exists(f)) {
+      LOG.debug("Creating new fake directory at {}", f);
+      String dirKey = addTrailingSlashIfNeeded(key);
+      adapter.createDirectory(dirKey);
+    }
+  }
+
+  /**
+   * Check if a file or directory exists corresponding to given path.
+   *
+   * @param f path to file/directory.
+   * @return true if it exists, false otherwise.
+   * @throws IOException
+   */
+  private boolean o3Exists(final Path f) throws IOException {
+    Path path = makeQualified(f);
+    try {
+      getFileStatus(path);
+      return true;
+    } catch (FileNotFoundException ex) {
+      return false;
+    }
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    incrementCounter(Statistic.INVOCATION_LIST_STATUS);
+    statistics.incrementReadOps(1);
+    LOG.trace("listStatus() path:{}", f);
+    int numEntries = LISTING_PAGE_SIZE;
+    LinkedList<FileStatus> statuses = new LinkedList<>();
+    List<FileStatus> tmpStatusList;
+    String startPath = "";
+
+    do {
+      tmpStatusList =
+          adapter.listStatus(pathToKey(f), false, startPath,
+              numEntries, uri, workingDir, getUsername())
+              .stream()
+              .map(this::convertFileStatus)
+              .collect(Collectors.toList());
+
+      if (!tmpStatusList.isEmpty()) {
+        if (startPath.isEmpty()) {
+          statuses.addAll(tmpStatusList);
+        } else {
+          statuses.addAll(tmpStatusList.subList(1, tmpStatusList.size()));
+        }
+        startPath = pathToKey(statuses.getLast().getPath());
+      }
+      // listStatus returns entries numEntries in size if available.
+      // Any lesser number of entries indicate that the required entries have
+      // exhausted.
+    } while (tmpStatusList.size() == numEntries);
+
+    return statuses.toArray(new FileStatus[0]);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    workingDir = newDir;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public Token<?> getDelegationToken(String renewer) throws IOException {
+    return adapter.getDelegationToken(renewer);
+  }
+
+  /**
+   * Get a canonical service name for this file system. If the URI is logical,
+   * the hostname part of the URI will be returned.
+   *
+   * @return a service string that uniquely identifies this file system.
+   */
+  @Override
+  public String getCanonicalServiceName() {
+    return adapter.getCanonicalServiceName();
+  }
+
+  /**
+   * Get the username of the FS.
+   *
+   * @return the short name of the user who instantiated the FS
+   */
+  public String getUsername() {
+    return userName;
+  }
+
+  /**
+   * Creates a directory. Directory is represented using a key with no value.
+   *
+   * @param path directory path to be created
+   * @return true if directory exists or created successfully.
+   * @throws IOException
+   */
+  private boolean mkdir(Path path) throws IOException {
+    return adapter.createDirectory(pathToKey(path));
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    LOG.trace("mkdir() path:{} ", f);
+    String key = pathToKey(f);
+    if (isEmpty(key)) {
+      return false;
+    }
+    return mkdir(f);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS);
+    statistics.incrementReadOps(1);
+    LOG.trace("getFileStatus() path:{}", f);
+    Path qualifiedPath = f.makeQualified(uri, workingDir);
+    String key = pathToKey(qualifiedPath);
+    FileStatus fileStatus = null;
+    try {
+      fileStatus = convertFileStatus(
+          adapter.getFileStatus(key, uri, qualifiedPath, getUsername()));
+    } catch (OMException ex) {
+      if (ex.getResult().equals(OMException.ResultCodes.KEY_NOT_FOUND)) {
+        throw new FileNotFoundException("File not found. path:" + f);
+      }
+    }
+    return fileStatus;
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(FileStatus fileStatus,
+      long start, long len)
+      throws IOException {
+    if (fileStatus instanceof LocatedFileStatus) {
+      return ((LocatedFileStatus) fileStatus).getBlockLocations();
+    } else {
+      return super.getFileBlockLocations(fileStatus, start, len);
+    }
+  }
+
+  /**
+   * Turn a path (relative or otherwise) into an Ozone key.
+   *
+   * @param path the path of the file.
+   * @return the key of the object that represents the file.
+   */
+  public String pathToKey(Path path) {
+    Objects.requireNonNull(path, "Path can't be null!");
+    if (!path.isAbsolute()) {
+      path = new Path(workingDir, path);
+    }
+    // removing leading '/' char
+    String key = path.toUri().getPath().substring(1);
+    LOG.trace("path for key: {} is: {}", key, path);
+    return key;
+  }
+
+  /**
+   * Add trailing delimiter to path if it is already not present.
+   *
+   * @param key the ozone Key which needs to be appended
+   * @return delimiter appended key
+   */
+  private String addTrailingSlashIfNeeded(String key) {
+    if (!isEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
+      return key + OZONE_URI_DELIMITER;
+    } else {
+      return key;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "RootedOzoneFileSystem{URI=" + uri + ", "
+        + "workingDir=" + workingDir + ", "
+        + "userName=" + userName + ", "
+        + "statistics=" + statistics
+        + "}";
+  }
+
+  /**
+   * This class provides an interface to iterate through all the keys in the
+   * bucket prefixed with the input path key and process them.
+   * <p>
+   * Each implementing class should define how the keys should be processed
+   * through the processKeyPath() function.
+   */
+  private abstract class OzoneListingIterator {
+    private final Path path;
+    private final FileStatus status;
+    private String pathKey;
+    private Iterator<BasicKeyInfo> keyIterator;
+
+    OzoneListingIterator(Path path)
+        throws IOException {
+      this.path = path;
+      this.status = getFileStatus(path);
+      this.pathKey = pathToKey(path);
+      if (status.isDirectory()) {
+        this.pathKey = addTrailingSlashIfNeeded(pathKey);
+      }
+      keyIterator = adapter.listKeys(pathKey);
+    }
+
+    /**
+     * The output of processKey determines if further iteration through the
+     * keys should be done or not.
+     *
+     * @return true if we should continue iteration of keys, false otherwise.
+     * @throws IOException
+     */
+    abstract boolean processKeyPath(String keyPath) throws IOException;
+
+    /**
+     * Iterates through all the keys prefixed with the input path's key and
+     * processes the key though processKey().
+     * If for any key, the processKey() returns false, then the iteration is
+     * stopped and returned with false indicating that all the keys could not
+     * be processed successfully.
+     *
+     * @return true if all keys are processed successfully, false otherwise.
+     * @throws IOException
+     */
+    boolean iterate() throws IOException {
+      LOG.trace("Iterating path: {}", path);
+      if (status.isDirectory()) {
+        LOG.trace("Iterating directory: {}", pathKey);
+        OFSPath ofsPath = new OFSPath(pathKey);
+        String ofsPathPrefix =
+            ofsPath.getNonKeyPathNoPrefixDelim() + OZONE_URI_DELIMITER;
+        while (keyIterator.hasNext()) {
+          BasicKeyInfo key = keyIterator.next();
+          // Convert key to full path before passing it to processKeyPath
+          // TODO: This conversion is redundant. But want to use only full path
+          //  outside AdapterImpl. - Maybe a refactor later.
+          String keyPath = ofsPathPrefix + key.getName();
+          LOG.trace("iterating key path: {}", keyPath);
+          if (!processKeyPath(keyPath)) {
+            return false;
+          }
+        }
+        return true;
+      } else {
+        LOG.trace("iterating file: {}", path);
+        return processKeyPath(pathKey);
+      }
+    }
+
+    String getPathKey() {
+      return pathKey;
+    }
+
+    boolean pathIsDirectory() {
+      return status.isDirectory();
+    }
+
+    FileStatus getStatus() {
+      return status;
+    }
+  }
+
+  public OzoneClientAdapter getAdapter() {
+    return adapter;
+  }
+
+  public boolean isEmpty(CharSequence cs) {
+    return cs == null || cs.length() == 0;
+  }
+
+  public boolean isNumber(String number) {
+    try {
+      Integer.parseInt(number);
+    } catch (NumberFormatException ex) {
+      return false;
+    }
+    return true;
+  }
+
+  private FileStatus convertFileStatus(
+      FileStatusAdapter fileStatusAdapter) {
+
+    Path symLink = null;
+    try {
+      fileStatusAdapter.getSymlink();
+    } catch (Exception ex) {
+      //NOOP: If not symlink symlink remains null.
+    }
+
+    FileStatus fileStatus = new FileStatus(
+        fileStatusAdapter.getLength(),
+        fileStatusAdapter.isDir(),
+        fileStatusAdapter.getBlockReplication(),
+        fileStatusAdapter.getBlocksize(),
+        fileStatusAdapter.getModificationTime(),
+        fileStatusAdapter.getAccessTime(),
+        new FsPermission(fileStatusAdapter.getPermission()),
+        fileStatusAdapter.getOwner(),
+        fileStatusAdapter.getGroup(),
+        symLink,
+        fileStatusAdapter.getPath()
+    );
+
+    BlockLocation[] blockLocations = fileStatusAdapter.getBlockLocations();
+    if (blockLocations == null || blockLocations.length == 0) {
+      return fileStatus;
+    }
+    return new LocatedFileStatus(fileStatus, blockLocations);
+  }
+}
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
new file mode 100644
index 0000000..748d0b4
--- /dev/null
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import java.util.StringTokenizer;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+
+/**
+ * Utility class for Rooted Ozone Filesystem (OFS) path processing.
+ */
[email protected]
[email protected]
+class OFSPath {
+  /**
+   * Here is a table illustrating what each name variable is given an input 
path
+   * Assuming /tmp is mounted to /tempVol/tempBucket
+   * (empty) = empty string "".
+   *
+   * Path                  volumeName     bucketName     mountName    keyName
+   * --------------------------------------------------------------------------
+   * /vol1/buc2/dir3/key4  vol1           buc2           (empty)      dir3/key4
+   * /vol1/buc2            vol1           buc2           (empty)      (empty)
+   * /vol1                 vol1           (empty)        (empty)      (empty)
+   * /tmp/dir3/key4        tempVolume     tempBucket     tmp          dir3/key4
+   *
+   * Note the leading '/' doesn't matter.
+   */
+  private String volumeName = "";
+  private String bucketName = "";
+  private String mountName = "";
+  private String keyName = "";
+  private static final String OFS_MOUNT_NAME_TMP = "tmp";
+
+  OFSPath(Path path) {
+    String pathStr = path.toUri().getPath();
+    initOFSPath(pathStr);
+  }
+
+  OFSPath(String pathStr) {
+    initOFSPath(pathStr);
+  }
+
+  private void initOFSPath(String pathStr) {
+    StringTokenizer token = new StringTokenizer(pathStr, OZONE_URI_DELIMITER);
+    int numToken = token.countTokens();
+    if (numToken > 0) {
+      String firstToken = token.nextToken();
+      // TODO: Compare a keyword list instead for future expansion.
+      if (firstToken.equals(OFS_MOUNT_NAME_TMP)) {
+        mountName = firstToken;
+        // TODO: Retrieve volume and bucket of the mount from user protobuf.
+        //  Leave them hard-coded just for now. Will be addressed in HDDS-2929
+        volumeName = "tempVolume";
+        bucketName = "tempBucket";
+      } else if (numToken >= 2) {
+        // Regular volume and bucket path
+        volumeName = firstToken;
+        bucketName = token.nextToken();
+      } else {
+        // Volume only
+        volumeName = firstToken;
+      }
+//    } else {  // TODO: Implement '/' case for ls.
+    }
+
+    // Compose key name
+    if (token.hasMoreTokens()) {
+      keyName = token.nextToken("").substring(1);
+    }
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  public String getMountName() {
+    return mountName;
+  }
+
+  // Shouldn't have a delimiter at beginning e.g. dir1/dir12
+  public String getKeyName() {
+    return keyName;
+  }
+
+  /**
+   * Get the volume & bucket or mount name (non-key path).
+   * @return String of path excluding key in bucket.
+   */
+  // Prepend a delimiter at beginning. e.g. /vol1/buc1
+  public String getNonKeyPath() {
+    return OZONE_URI_DELIMITER + getNonKeyPathNoPrefixDelim();
+  }
+
+  // Don't prepend the delimiter. e.g. vol1/buc1
+  public String getNonKeyPathNoPrefixDelim() {
+    if (isMount()) {
+      return mountName;
+    } else {
+      return volumeName + OZONE_URI_DELIMITER + bucketName;
+    }
+  }
+
+  public boolean isMount() {
+    return mountName.length() > 0;
+  }
+
+  private static boolean isInSameBucketAsInternal(
+      OFSPath p1, OFSPath p2) {
+
+    return p1.getVolumeName().equals(p2.getVolumeName()) &&
+        p1.getBucketName().equals(p2.getBucketName());
+  }
+
+  /**
+   * Check if this OFSPath is in the same bucket as another given OFSPath.
+   * Note that mount name is resolved into volume and bucket names.
+   * @return true if in the same bucket, false otherwise.
+   */
+  public boolean isInSameBucketAs(OFSPath p2) {
+    return isInSameBucketAsInternal(this, p2);
+  }
+}
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java
index 4442c63..2ec6e5d 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java
@@ -125,6 +125,33 @@ public final class OzoneClientAdapterFactory {
     }
   }
 
+  /**
+   * createAdapter() for OFS.
+   */
+  @SuppressFBWarnings("DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED")
+  public static OzoneClientAdapter createAdapter()
+      throws IOException {
+    return createAdapter(true,
+        (aClass) -> (RootedOzoneClientAdapter) aClass
+            .getConstructor()
+            .newInstance());
+  }
+
+  @SuppressFBWarnings("DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED")
+  public static OzoneClientAdapter createAdapter(
+      StorageStatistics storageStatistics) throws IOException {
+    return createAdapter(false,
+        (aClass) -> (RootedOzoneClientAdapter) aClass
+            .getConstructor(OzoneFSStorageStatistics.class)
+            .newInstance(storageStatistics));
+  }
+
+  @SuppressFBWarnings("DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED")
+  public static OzoneClientAdapter createAdapter(
+      boolean basic, OzoneClientAdapterCreator creator) throws IOException {
+    return createAdapter(null, null, basic, creator);
+  }
+
   private static void findConfigDirUrl(List<URL> urls,
       ClassLoader currentClassLoader) throws IOException {
     Enumeration<URL> conf =
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneClientAdapter.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneClientAdapter.java
new file mode 100644
index 0000000..c0c763d
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneClientAdapter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import java.io.IOException;
+
+/**
+ * Lightweight adapter to separate hadoop/ozone classes.
+ * <p>
+ * This class contains only the bare minimum Ozone classes in the signature.
+ * It could be loaded by a different classloader because only the objects in
+ * the method signatures should be shared between the classloader.
+ */
+public interface RootedOzoneClientAdapter extends OzoneClientAdapter {
+
+  // renameKey is not supported in OFS, user should use rename instead.
+  void rename(String pathStr, String newPath) throws IOException;
+}
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneClientAdapterImpl.java
new file mode 100644
index 0000000..e8ac316
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneClientAdapterImpl.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+/**
+ * Implementation of the RootedOzoneFileSystem calls.
+ */
+public class RootedOzoneClientAdapterImpl
+    extends BasicRootedOzoneClientAdapterImpl {
+
+  private OzoneFSStorageStatistics storageStatistics;
+
+  public RootedOzoneClientAdapterImpl(
+      OzoneFSStorageStatistics storageStatistics)
+      throws IOException {
+    super();
+    this.storageStatistics = storageStatistics;
+  }
+
+  public RootedOzoneClientAdapterImpl(
+      OzoneConfiguration conf, OzoneFSStorageStatistics storageStatistics)
+      throws IOException {
+    super(conf);
+    this.storageStatistics = storageStatistics;
+  }
+
+  public RootedOzoneClientAdapterImpl(String omHost, int omPort,
+      Configuration hadoopConf, OzoneFSStorageStatistics storageStatistics)
+      throws IOException {
+    super(omHost, omPort, hadoopConf);
+    this.storageStatistics = storageStatistics;
+  }
+
+  @Override
+  protected void incrementCounter(Statistic objectsRead) {
+    if (storageStatistics != null) {
+      storageStatistics.incrementCounter(objectsRead, 1);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
new file mode 100644
index 0000000..a8bd099
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
+import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.security.token.DelegationTokenIssuer;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * The Ozone Filesystem implementation.
+ * <p>
+ * This subclass is marked as private as code should not be creating it
+ * directly; use {@link FileSystem#get(Configuration)} and variants to create
+ * one. If cast to {@link RootedOzoneFileSystem}, extra methods and features
+ * may be accessed. Consider those private and unstable.
+ */
[email protected]
[email protected]
+public class RootedOzoneFileSystem extends BasicRootedOzoneFileSystem
+    implements KeyProviderTokenIssuer {
+
+  private OzoneFSStorageStatistics storageStatistics;
+
+  @Override
+  public KeyProvider getKeyProvider() throws IOException {
+    return getAdapter().getKeyProvider();
+  }
+
+  @Override
+  public URI getKeyProviderUri() throws IOException {
+    return getAdapter().getKeyProviderUri();
+  }
+
+  @Override
+  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+      throws IOException {
+    KeyProvider keyProvider;
+    try {
+      keyProvider = getKeyProvider();
+    } catch (IOException ioe) {
+      LOG.debug("Error retrieving KeyProvider.", ioe);
+      return null;
+    }
+    if (keyProvider instanceof DelegationTokenIssuer) {
+      return new DelegationTokenIssuer[]{(DelegationTokenIssuer)keyProvider};
+    }
+    return null;
+  }
+
+  StorageStatistics getOzoneFSOpsCountStatistics() {
+    return storageStatistics;
+  }
+
+  @Override
+  protected void incrementCounter(Statistic statistic) {
+    if (storageStatistics != null) {
+      storageStatistics.incrementCounter(statistic, 1);
+    }
+  }
+
+  @Override
+  protected OzoneClientAdapter createAdapter(Configuration conf,
+      String omHost, int omPort, boolean isolatedClassloader)
+      throws IOException {
+
+    this.storageStatistics =
+        (OzoneFSStorageStatistics) GlobalStorageStatistics.INSTANCE
+            .put(OzoneFSStorageStatistics.NAME,
+                OzoneFSStorageStatistics::new);
+
+    if (isolatedClassloader) {
+      return OzoneClientAdapterFactory.createAdapter(storageStatistics);
+    } else {
+      return new RootedOzoneClientAdapterImpl(omHost, omPort, conf,
+          storageStatistics);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
 
b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
new file mode 100644
index 0000000..ed6a069
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testing basic functions of utility class OFSPath.
+ */
+public class TestOFSPath {
+
+  @Test
+  public void testParsingVolumeBucketWithKey() {
+    // Two most common cases: file key and dir key inside a bucket
+    OFSPath ofsPath = new OFSPath("/volume1/bucket2/dir3/key4");
+    Assert.assertEquals("volume1", ofsPath.getVolumeName());
+    Assert.assertEquals("bucket2", ofsPath.getBucketName());
+    Assert.assertEquals("dir3/key4", ofsPath.getKeyName());
+    Assert.assertEquals("/volume1/bucket2", ofsPath.getNonKeyPath());
+    Assert.assertFalse(ofsPath.isMount());
+
+    // The ending '/' matters for key inside a bucket, indicating directory
+    ofsPath = new OFSPath("/volume1/bucket2/dir3/dir5/");
+    Assert.assertEquals("volume1", ofsPath.getVolumeName());
+    Assert.assertEquals("bucket2", ofsPath.getBucketName());
+    // Check the key must end with '/' (dir5 is a directory)
+    Assert.assertEquals("dir3/dir5/", ofsPath.getKeyName());
+    Assert.assertEquals("/volume1/bucket2", ofsPath.getNonKeyPath());
+    Assert.assertFalse(ofsPath.isMount());
+  }
+
+  @Test
+  public void testParsingVolumeBucketOnly() {
+    // Volume and bucket only
+    OFSPath ofsPath = new OFSPath("/volume1/bucket2/");
+    Assert.assertEquals("volume1", ofsPath.getVolumeName());
+    Assert.assertEquals("bucket2", ofsPath.getBucketName());
+    Assert.assertEquals("", ofsPath.getMountName());
+    Assert.assertEquals("", ofsPath.getKeyName());
+    Assert.assertEquals("/volume1/bucket2", ofsPath.getNonKeyPath());
+    Assert.assertFalse(ofsPath.isMount());
+
+    // The ending '/' shouldn't for buckets
+    ofsPath = new OFSPath("/volume1/bucket2");
+    Assert.assertEquals("volume1", ofsPath.getVolumeName());
+    Assert.assertEquals("bucket2", ofsPath.getBucketName());
+    Assert.assertEquals("", ofsPath.getMountName());
+    Assert.assertEquals("", ofsPath.getKeyName());
+    Assert.assertEquals("/volume1/bucket2", ofsPath.getNonKeyPath());
+    Assert.assertFalse(ofsPath.isMount());
+  }
+
+  @Test
+  public void testParsingVolumeOnly() {
+    // Volume only
+    OFSPath ofsPath = new OFSPath("/volume1/");
+    Assert.assertEquals("volume1", ofsPath.getVolumeName());
+    Assert.assertEquals("", ofsPath.getBucketName());
+    Assert.assertEquals("", ofsPath.getMountName());
+    Assert.assertEquals("", ofsPath.getKeyName());
+    Assert.assertEquals("/volume1/", ofsPath.getNonKeyPath());
+    Assert.assertFalse(ofsPath.isMount());
+
+    // Ending '/' shouldn't matter
+    ofsPath = new OFSPath("/volume1");
+    Assert.assertEquals("volume1", ofsPath.getVolumeName());
+    Assert.assertEquals("", ofsPath.getBucketName());
+    Assert.assertEquals("", ofsPath.getMountName());
+    Assert.assertEquals("", ofsPath.getKeyName());
+    // Note: currently getNonKeyPath() returns with '/' if input is volume 
only.
+    //  There is no use case for this for now.
+    //  The behavior might change in the future.
+    Assert.assertEquals("/volume1/", ofsPath.getNonKeyPath());
+    Assert.assertFalse(ofsPath.isMount());
+  }
+
+  @Test
+  public void testParsingMount() {
+    // Mount only
+    OFSPath ofsPath = new OFSPath("/tmp/");
+    // TODO: Subject to change in HDDS-2929.
+    Assert.assertEquals("tempVolume", ofsPath.getVolumeName());
+    Assert.assertEquals("tempBucket", ofsPath.getBucketName());
+    Assert.assertEquals("tmp", ofsPath.getMountName());
+    Assert.assertEquals("", ofsPath.getKeyName());
+    Assert.assertEquals("/tmp", ofsPath.getNonKeyPath());
+    Assert.assertTrue(ofsPath.isMount());
+
+    // Mount with key
+    ofsPath = new OFSPath("/tmp/key1");
+    // TODO: Subject to change in HDDS-2929.
+    Assert.assertEquals("tempVolume", ofsPath.getVolumeName());
+    Assert.assertEquals("tempBucket", ofsPath.getBucketName());
+    Assert.assertEquals("tmp", ofsPath.getMountName());
+    Assert.assertEquals("key1", ofsPath.getKeyName());
+    Assert.assertEquals("/tmp", ofsPath.getNonKeyPath());
+    Assert.assertTrue(ofsPath.isMount());
+  }
+}
diff --git 
a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithMocks.java
 
b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithMocks.java
new file mode 100644
index 0000000..e278d0c
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithMocks.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Ozone File system tests that are light weight and use mocks.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ OzoneClientFactory.class, UserGroupInformation.class })
+@PowerMockIgnore("javax.management.*")
+public class TestRootedOzoneFileSystemWithMocks {
+
+  @Test
+  public void testFSUriWithHostPortOverrides() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    OzoneClient ozoneClient = mock(OzoneClient.class);
+    ObjectStore objectStore = mock(ObjectStore.class);
+
+    when(ozoneClient.getObjectStore()).thenReturn(objectStore);
+
+    PowerMockito.mockStatic(OzoneClientFactory.class);
+    PowerMockito.when(OzoneClientFactory.getRpcClient(eq("local.host"),
+        eq(5899), eq(conf))).thenReturn(ozoneClient);
+
+    UserGroupInformation ugi = mock(UserGroupInformation.class);
+    PowerMockito.mockStatic(UserGroupInformation.class);
+    PowerMockito.when(UserGroupInformation.getCurrentUser()).thenReturn(ugi);
+    when(ugi.getShortUserName()).thenReturn("user1");
+
+    // Note: FileSystem#loadFileSystems doesn't load OFS class because
+    //  META-INF still points to org.apache.hadoop.fs.ozone.OzoneFileSystem
+    conf.set("fs.ofs.impl", 
"org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
+
+    URI uri = new URI("ofs://local.host:5899");
+
+    FileSystem fileSystem = FileSystem.get(uri, conf);
+    RootedOzoneFileSystem ofs = (RootedOzoneFileSystem) fileSystem;
+
+    assertEquals(ofs.getUri().getAuthority(), "local.host:5899");
+    PowerMockito.verifyStatic();
+    OzoneClientFactory.getRpcClient("local.host", 5899, conf);
+  }
+
+  @Test
+  public void testFSUriWithHostPortUnspecified() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    final int omPort = OmUtils.getOmRpcPort(conf);
+
+    OzoneClient ozoneClient = mock(OzoneClient.class);
+    ObjectStore objectStore = mock(ObjectStore.class);
+
+    when(ozoneClient.getObjectStore()).thenReturn(objectStore);
+
+    PowerMockito.mockStatic(OzoneClientFactory.class);
+    PowerMockito.when(OzoneClientFactory.getRpcClient(eq("local.host"),
+        eq(omPort), eq(conf))).thenReturn(ozoneClient);
+
+    UserGroupInformation ugi = mock(UserGroupInformation.class);
+    PowerMockito.mockStatic(UserGroupInformation.class);
+    PowerMockito.when(UserGroupInformation.getCurrentUser()).thenReturn(ugi);
+    when(ugi.getShortUserName()).thenReturn("user1");
+
+    // Note: FileSystem#loadFileSystems doesn't load OFS class because
+    //  META-INF still points to org.apache.hadoop.fs.ozone.OzoneFileSystem
+    conf.set("fs.ofs.impl", 
"org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
+
+    URI uri = new URI("ofs://local.host");
+
+    FileSystem fileSystem = FileSystem.get(uri, conf);
+    RootedOzoneFileSystem ofs = (RootedOzoneFileSystem) fileSystem;
+
+    assertEquals(ofs.getUri().getHost(), "local.host");
+    // The URI doesn't contain a port number, expect -1 from getPort()
+    assertEquals(ofs.getUri().getPort(), -1);
+    PowerMockito.verifyStatic();
+    // Check the actual port number in use
+    OzoneClientFactory.getRpcClient("local.host", omPort, conf);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to