This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-2665-ofs
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-2665-ofs by this push:
     new 3eaec50  HDDS-3279. Rebase OFS branch (#731)
3eaec50 is described below

commit 3eaec50a7f1839bf6b3ef88f6904ffb272d60868
Author: Siyao Meng <[email protected]>
AuthorDate: Fri Mar 27 11:32:54 2020 -0700

    HDDS-3279. Rebase OFS branch (#731)
---
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   | 97 ++++++++++++++++++++--
 .../fs/ozone/BasicRootedOzoneFileSystem.java       | 40 +++++++--
 2 files changed, 122 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 64f8581..4409891 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -28,9 +28,11 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -38,6 +40,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OmUtils;
@@ -51,6 +54,9 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -89,6 +95,7 @@ public class BasicRootedOzoneClientAdapterImpl
   private ReplicationType replicationType;
   private ReplicationFactor replicationFactor;
   private boolean securityEnabled;
+  private int configuredDnPort;
 
   /**
    * Create new OzoneClientAdapter implementation.
@@ -177,6 +184,9 @@ public class BasicRootedOzoneClientAdapterImpl
       proxy = objectStore.getClientProxy();
       this.replicationType = ReplicationType.valueOf(replicationTypeConf);
       this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
+      this.configuredDnPort = conf.getInt(
+          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     } finally {
       Thread.currentThread().setContextClassLoader(contextClassLoader);
     }
@@ -259,6 +269,11 @@ public class BasicRootedOzoneClientAdapterImpl
   }
 
   @Override
+  public short getDefaultReplication() {
+    return (short) replicationFactor.getValue();
+  }
+
+  @Override
   public void close() throws IOException {
     ozoneClient.close();
   }
@@ -287,16 +302,25 @@ public class BasicRootedOzoneClientAdapterImpl
   }
 
   @Override
-  public OzoneFSOutputStream createFile(String pathStr, boolean overWrite,
-      boolean recursive) throws IOException {
+  public OzoneFSOutputStream createFile(String pathStr, short replication,
+      boolean overWrite, boolean recursive) throws IOException {
     incrementCounter(Statistic.OBJECTS_CREATED);
     OFSPath ofsPath = new OFSPath(pathStr);
     String key = ofsPath.getKeyName();
     try {
       // Hadoop CopyCommands class always sets recursive to true
       OzoneBucket bucket = getBucket(ofsPath, recursive);
-      OzoneOutputStream ozoneOutputStream = bucket.createFile(
-          key, 0, replicationType, replicationFactor, overWrite, recursive);
+      OzoneOutputStream ozoneOutputStream = null;
+      if (replication == ReplicationFactor.ONE.getValue()
+          || replication == ReplicationFactor.THREE.getValue()) {
+        ReplicationFactor clientReplication = ReplicationFactor
+            .valueOf(replication);
+        ozoneOutputStream = bucket.createFile(key, 0, replicationType,
+            clientReplication, overWrite, recursive);
+      } else {
+        ozoneOutputStream = bucket.createFile(key, 0, replicationType,
+            replicationFactor, overWrite, recursive);
+      }
       return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
     } catch (OMException ex) {
       if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
@@ -740,11 +764,67 @@ public class BasicRootedOzoneClientAdapterImpl
         status.getPermission().toShort(),
         status.getOwner(),
         status.getGroup(),
-        status.getPath()
+        status.getPath(),
+        getBlockLocations(status)
     );
   }
 
   /**
+   * Helper method to get List of BlockLocation from OM Key info.
+   * @param fileStatus Ozone key file status.
+   * @return list of block locations.
+   */
+  private BlockLocation[] getBlockLocations(OzoneFileStatus fileStatus) {
+
+    if (fileStatus == null) {
+      return new BlockLocation[0];
+    }
+
+    OmKeyInfo keyInfo = fileStatus.getKeyInfo();
+    if (keyInfo == null || CollectionUtils.isEmpty(
+        keyInfo.getKeyLocationVersions())) {
+      return new BlockLocation[0];
+    }
+    List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups =
+        keyInfo.getKeyLocationVersions();
+    if (CollectionUtils.isEmpty(omKeyLocationInfoGroups)) {
+      return new BlockLocation[0];
+    }
+
+    OmKeyLocationInfoGroup omKeyLocationInfoGroup =
+        keyInfo.getLatestVersionLocations();
+    BlockLocation[] blockLocations = new BlockLocation[
+        omKeyLocationInfoGroup.getBlocksLatestVersionOnly().size()];
+
+    int i = 0;
+    long offsetOfBlockInFile = 0L;
+    for (OmKeyLocationInfo omKeyLocationInfo :
+        omKeyLocationInfoGroup.getBlocksLatestVersionOnly()) {
+      List<String> hostList = new ArrayList<>();
+      List<String> nameList = new ArrayList<>();
+      omKeyLocationInfo.getPipeline().getNodes()
+          .forEach(dn -> {
+            hostList.add(dn.getHostName());
+            int port = dn.getPort(
+                DatanodeDetails.Port.Name.STANDALONE).getValue();
+            if (port == 0) {
+              port = configuredDnPort;
+            }
+            nameList.add(dn.getHostName() + ":" + port);
+          });
+
+      String[] hosts = hostList.toArray(new String[hostList.size()]);
+      String[] names = nameList.toArray(new String[nameList.size()]);
+      BlockLocation blockLocation = new BlockLocation(
+          names, hosts, offsetOfBlockInFile,
+          omKeyLocationInfo.getLength());
+      offsetOfBlockInFile += omKeyLocationInfo.getLength();
+      blockLocations[i++] = blockLocation;
+    }
+    return blockLocations;
+  }
+
+  /**
    * Generate a FileStatusAdapter for a volume.
    * @param ozoneVolume OzoneVolume object
    * @param uri Full URI to OFS root.
@@ -763,7 +843,8 @@ public class BasicRootedOzoneClientAdapterImpl
         ozoneVolume.getCreationTime().getEpochSecond() * 1000, 0L,
         FsPermission.getDirDefault().toShort(),
         // TODO: Revisit owner and admin
-        ozoneVolume.getOwner(), ozoneVolume.getAdmin(), path
+        ozoneVolume.getOwner(), ozoneVolume.getAdmin(), path,
+        new BlockLocation[0]
     );
   }
 
@@ -788,7 +869,7 @@ public class BasicRootedOzoneClientAdapterImpl
         ozoneBucket.getCreationTime().getEpochSecond() * 1000, 0L,
         FsPermission.getDirDefault().toShort(),  // TODO: derive from ACLs 
later
         // TODO: revisit owner and group
-        username, username, path);
+        username, username, path, new BlockLocation[0]);
   }
 
   /**
@@ -803,7 +884,7 @@ public class BasicRootedOzoneClientAdapterImpl
     return new FileStatusAdapter(0L, path, true, (short)0, 0L,
         System.currentTimeMillis(), 0L,
         FsPermission.getDirDefault().toShort(),
-        null, null, null
+        null, null, null, new BlockLocation[0]
     );
   }
 }
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 9274ffc..ff84175 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -22,12 +22,14 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -206,7 +208,7 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
     incrementCounter(Statistic.INVOCATION_CREATE);
     statistics.incrementWriteOps(1);
     final String key = pathToKey(f);
-    return createOutputStream(key, overwrite, true);
+    return createOutputStream(key, replication, overwrite, true);
   }
 
   @Override
@@ -220,13 +222,14 @@ public class BasicRootedOzoneFileSystem extends 
FileSystem {
     incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE);
     statistics.incrementWriteOps(1);
     final String key = pathToKey(path);
-    return createOutputStream(key, flags.contains(CreateFlag.OVERWRITE), 
false);
+    return createOutputStream(key,
+        replication, flags.contains(CreateFlag.OVERWRITE), false);
   }
 
-  private FSDataOutputStream createOutputStream(String key, boolean overwrite,
-      boolean recursive) throws IOException {
-    return new FSDataOutputStream(adapter.createFile(key, overwrite, 
recursive),
-        statistics);
+  private FSDataOutputStream createOutputStream(String key, short replication,
+      boolean overwrite, boolean recursive) throws IOException {
+    return new FSDataOutputStream(adapter.createFile(key,
+        replication, overwrite, recursive), statistics);
   }
 
   @Override
@@ -626,6 +629,22 @@ public class BasicRootedOzoneFileSystem extends FileSystem 
{
     return fileStatus;
   }
 
+  @Override
+  public BlockLocation[] getFileBlockLocations(FileStatus fileStatus,
+      long start, long len)
+      throws IOException {
+    if (fileStatus instanceof LocatedFileStatus) {
+      return ((LocatedFileStatus) fileStatus).getBlockLocations();
+    } else {
+      return super.getFileBlockLocations(fileStatus, start, len);
+    }
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return adapter.getDefaultReplication();
+  }
+
   /**
    * Turn a path (relative or otherwise) into an Ozone key.
    *
@@ -773,7 +792,7 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
       //NOOP: If not symlink symlink remains null.
     }
 
-    return new FileStatus(
+    FileStatus fileStatus = new FileStatus(
         fileStatusAdapter.getLength(),
         fileStatusAdapter.isDir(),
         fileStatusAdapter.getBlockReplication(),
@@ -786,5 +805,12 @@ public class BasicRootedOzoneFileSystem extends FileSystem 
{
         symLink,
         fileStatusAdapter.getPath()
     );
+
+    BlockLocation[] blockLocations = fileStatusAdapter.getBlockLocations();
+    if (blockLocations == null || blockLocations.length == 0) {
+      return fileStatus;
+    }
+    return new LocatedFileStatus(fileStatus, blockLocations);
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to