This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch HDDS-2665-ofs
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2665-ofs by this push:
new 3eaec50 HDDS-3279. Rebase OFS branch (#731)
3eaec50 is described below
commit 3eaec50a7f1839bf6b3ef88f6904ffb272d60868
Author: Siyao Meng <[email protected]>
AuthorDate: Fri Mar 27 11:32:54 2020 -0700
HDDS-3279. Rebase OFS branch (#731)
---
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 97 ++++++++++++++++++++--
.../fs/ozone/BasicRootedOzoneFileSystem.java | 40 +++++++--
2 files changed, 122 insertions(+), 15 deletions(-)
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 64f8581..4409891 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -28,9 +28,11 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -38,6 +40,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OmUtils;
@@ -51,6 +54,9 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -89,6 +95,7 @@ public class BasicRootedOzoneClientAdapterImpl
private ReplicationType replicationType;
private ReplicationFactor replicationFactor;
private boolean securityEnabled;
+ private int configuredDnPort;
/**
* Create new OzoneClientAdapter implementation.
@@ -177,6 +184,9 @@ public class BasicRootedOzoneClientAdapterImpl
proxy = objectStore.getClientProxy();
this.replicationType = ReplicationType.valueOf(replicationTypeConf);
this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
+ this.configuredDnPort = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
@@ -259,6 +269,11 @@ public class BasicRootedOzoneClientAdapterImpl
}
@Override
+ public short getDefaultReplication() {
+ return (short) replicationFactor.getValue();
+ }
+
+ @Override
public void close() throws IOException {
ozoneClient.close();
}
@@ -287,16 +302,25 @@ public class BasicRootedOzoneClientAdapterImpl
}
@Override
- public OzoneFSOutputStream createFile(String pathStr, boolean overWrite,
- boolean recursive) throws IOException {
+ public OzoneFSOutputStream createFile(String pathStr, short replication,
+ boolean overWrite, boolean recursive) throws IOException {
incrementCounter(Statistic.OBJECTS_CREATED);
OFSPath ofsPath = new OFSPath(pathStr);
String key = ofsPath.getKeyName();
try {
// Hadoop CopyCommands class always sets recursive to true
OzoneBucket bucket = getBucket(ofsPath, recursive);
- OzoneOutputStream ozoneOutputStream = bucket.createFile(
- key, 0, replicationType, replicationFactor, overWrite, recursive);
+ OzoneOutputStream ozoneOutputStream = null;
+ if (replication == ReplicationFactor.ONE.getValue()
+ || replication == ReplicationFactor.THREE.getValue()) {
+ ReplicationFactor clientReplication = ReplicationFactor
+ .valueOf(replication);
+ ozoneOutputStream = bucket.createFile(key, 0, replicationType,
+ clientReplication, overWrite, recursive);
+ } else {
+ ozoneOutputStream = bucket.createFile(key, 0, replicationType,
+ replicationFactor, overWrite, recursive);
+ }
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
@@ -740,11 +764,67 @@ public class BasicRootedOzoneClientAdapterImpl
status.getPermission().toShort(),
status.getOwner(),
status.getGroup(),
- status.getPath()
+ status.getPath(),
+ getBlockLocations(status)
);
}
/**
+ * Helper method to get List of BlockLocation from OM Key info.
+ * @param fileStatus Ozone key file status.
+ * @return list of block locations.
+ */
+ private BlockLocation[] getBlockLocations(OzoneFileStatus fileStatus) {
+
+ if (fileStatus == null) {
+ return new BlockLocation[0];
+ }
+
+ OmKeyInfo keyInfo = fileStatus.getKeyInfo();
+ if (keyInfo == null || CollectionUtils.isEmpty(
+ keyInfo.getKeyLocationVersions())) {
+ return new BlockLocation[0];
+ }
+ List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups =
+ keyInfo.getKeyLocationVersions();
+ if (CollectionUtils.isEmpty(omKeyLocationInfoGroups)) {
+ return new BlockLocation[0];
+ }
+
+ OmKeyLocationInfoGroup omKeyLocationInfoGroup =
+ keyInfo.getLatestVersionLocations();
+ BlockLocation[] blockLocations = new BlockLocation[
+ omKeyLocationInfoGroup.getBlocksLatestVersionOnly().size()];
+
+ int i = 0;
+ long offsetOfBlockInFile = 0L;
+ for (OmKeyLocationInfo omKeyLocationInfo :
+ omKeyLocationInfoGroup.getBlocksLatestVersionOnly()) {
+ List<String> hostList = new ArrayList<>();
+ List<String> nameList = new ArrayList<>();
+ omKeyLocationInfo.getPipeline().getNodes()
+ .forEach(dn -> {
+ hostList.add(dn.getHostName());
+ int port = dn.getPort(
+ DatanodeDetails.Port.Name.STANDALONE).getValue();
+ if (port == 0) {
+ port = configuredDnPort;
+ }
+ nameList.add(dn.getHostName() + ":" + port);
+ });
+
+ String[] hosts = hostList.toArray(new String[hostList.size()]);
+ String[] names = nameList.toArray(new String[nameList.size()]);
+ BlockLocation blockLocation = new BlockLocation(
+ names, hosts, offsetOfBlockInFile,
+ omKeyLocationInfo.getLength());
+ offsetOfBlockInFile += omKeyLocationInfo.getLength();
+ blockLocations[i++] = blockLocation;
+ }
+ return blockLocations;
+ }
+
+ /**
* Generate a FileStatusAdapter for a volume.
* @param ozoneVolume OzoneVolume object
* @param uri Full URI to OFS root.
@@ -763,7 +843,8 @@ public class BasicRootedOzoneClientAdapterImpl
ozoneVolume.getCreationTime().getEpochSecond() * 1000, 0L,
FsPermission.getDirDefault().toShort(),
// TODO: Revisit owner and admin
- ozoneVolume.getOwner(), ozoneVolume.getAdmin(), path
+ ozoneVolume.getOwner(), ozoneVolume.getAdmin(), path,
+ new BlockLocation[0]
);
}
@@ -788,7 +869,7 @@ public class BasicRootedOzoneClientAdapterImpl
ozoneBucket.getCreationTime().getEpochSecond() * 1000, 0L,
FsPermission.getDirDefault().toShort(), // TODO: derive from ACLs
later
// TODO: revisit owner and group
- username, username, path);
+ username, username, path, new BlockLocation[0]);
}
/**
@@ -803,7 +884,7 @@ public class BasicRootedOzoneClientAdapterImpl
return new FileStatusAdapter(0L, path, true, (short)0, 0L,
System.currentTimeMillis(), 0L,
FsPermission.getDirDefault().toShort(),
- null, null, null
+ null, null, null, new BlockLocation[0]
);
}
}
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 9274ffc..ff84175 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -22,12 +22,14 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -206,7 +208,7 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
incrementCounter(Statistic.INVOCATION_CREATE);
statistics.incrementWriteOps(1);
final String key = pathToKey(f);
- return createOutputStream(key, overwrite, true);
+ return createOutputStream(key, replication, overwrite, true);
}
@Override
@@ -220,13 +222,14 @@ public class BasicRootedOzoneFileSystem extends
FileSystem {
incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE);
statistics.incrementWriteOps(1);
final String key = pathToKey(path);
- return createOutputStream(key, flags.contains(CreateFlag.OVERWRITE),
false);
+ return createOutputStream(key,
+ replication, flags.contains(CreateFlag.OVERWRITE), false);
}
- private FSDataOutputStream createOutputStream(String key, boolean overwrite,
- boolean recursive) throws IOException {
- return new FSDataOutputStream(adapter.createFile(key, overwrite,
recursive),
- statistics);
+ private FSDataOutputStream createOutputStream(String key, short replication,
+ boolean overwrite, boolean recursive) throws IOException {
+ return new FSDataOutputStream(adapter.createFile(key,
+ replication, overwrite, recursive), statistics);
}
@Override
@@ -626,6 +629,22 @@ public class BasicRootedOzoneFileSystem extends FileSystem
{
return fileStatus;
}
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus fileStatus,
+ long start, long len)
+ throws IOException {
+ if (fileStatus instanceof LocatedFileStatus) {
+ return ((LocatedFileStatus) fileStatus).getBlockLocations();
+ } else {
+ return super.getFileBlockLocations(fileStatus, start, len);
+ }
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return adapter.getDefaultReplication();
+ }
+
/**
* Turn a path (relative or otherwise) into an Ozone key.
*
@@ -773,7 +792,7 @@ public class BasicRootedOzoneFileSystem extends FileSystem {
//NOOP: If not symlink symlink remains null.
}
- return new FileStatus(
+ FileStatus fileStatus = new FileStatus(
fileStatusAdapter.getLength(),
fileStatusAdapter.isDir(),
fileStatusAdapter.getBlockReplication(),
@@ -786,5 +805,12 @@ public class BasicRootedOzoneFileSystem extends FileSystem
{
symLink,
fileStatusAdapter.getPath()
);
+
+ BlockLocation[] blockLocations = fileStatusAdapter.getBlockLocations();
+ if (blockLocations == null || blockLocations.length == 0) {
+ return fileStatus;
+ }
+ return new LocatedFileStatus(fileStatus, blockLocations);
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]