smengcl commented on a change in pull request #1021: URL: https://github.com/apache/hadoop-ozone/pull/1021#discussion_r437760431
########## File path: hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java ########## @@ -0,0 +1,904 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.ozone; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Progressable; +import org.apache.http.client.utils.URIBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE; +import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER; +import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_EMPTY; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY; + +/** + * The minimal Ozone Filesystem implementation. + * <p> + * This is a basic version which doesn't extend + * KeyProviderTokenIssuer and doesn't include statistics. It can be used + * from older hadoop version. For newer hadoop version use the full featured + * BasicRootedOzoneFileSystem. + */ [email protected] [email protected] +public class BasicRootedOzoneFileSystem extends FileSystem { + static final Logger LOG = + LoggerFactory.getLogger(BasicRootedOzoneFileSystem.class); + + /** + * The Ozone client for connecting to Ozone server. + */ + + private URI uri; + private String userName; + private Path workingDir; + private OzoneClientAdapter adapter; + private BasicRootedOzoneClientAdapterImpl adapterImpl; + + private static final String URI_EXCEPTION_TEXT = + "URL should be one of the following formats: " + + "ofs://om-service-id/path/to/key OR " + + "ofs://om-host.example.com/path/to/key OR " + + "ofs://om-host.example.com:5678/path/to/key"; + + @Override + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + setConf(conf); + Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name); + Preconditions.checkArgument(getScheme().equals(name.getScheme()), + "Invalid scheme provided in " + name); + + String authority = name.getAuthority(); + if (authority == null) { + // authority is null when fs.defaultFS is not a qualified ofs URI and + // ofs:/// is passed to the client. matcher will NPE if authority is null + throw new IllegalArgumentException(URI_EXCEPTION_TEXT); + } + + String omHostOrServiceId; + int omPort = -1; + // Parse hostname and port + String[] parts = authority.split(":"); + if (parts.length > 2) { + throw new IllegalArgumentException(URI_EXCEPTION_TEXT); + } + omHostOrServiceId = parts[0]; + if (parts.length == 2) { + try { + omPort = Integer.parseInt(parts[1]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(URI_EXCEPTION_TEXT); + } + } + + try { + uri = new URIBuilder().setScheme(OZONE_OFS_URI_SCHEME) + .setHost(authority) + .build(); + LOG.trace("Ozone URI for OFS initialization is " + uri); + + //isolated is the default for ozonefs-lib-legacy which includes the + // /ozonefs.txt, otherwise the default is false. It could be overridden. + boolean defaultValue = + BasicRootedOzoneFileSystem.class.getClassLoader() + .getResource("ozonefs.txt") != null; + + //Use string here instead of the constant as constant may not be available + //on the classpath of a hadoop 2.7 + boolean isolatedClassloader = + conf.getBoolean("ozone.fs.isolated-classloader", defaultValue); + + ConfigurationSource source; + if (conf instanceof OzoneConfiguration) { + source = (ConfigurationSource) conf; + } else { + source = new LegacyHadoopConfigurationSource(conf); + } + this.adapter = + createAdapter(source, + omHostOrServiceId, omPort, + isolatedClassloader); + this.adapterImpl = (BasicRootedOzoneClientAdapterImpl) this.adapter; + + try { + this.userName = + UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + this.userName = OZONE_DEFAULT_USER; + } + this.workingDir = new Path(OZONE_USER_DIR, this.userName) + .makeQualified(this.uri, this.workingDir); + } catch (URISyntaxException ue) { + final String msg = "Invalid Ozone endpoint " + name; + LOG.error(msg, ue); + throw new IOException(msg, ue); + } + } + + protected OzoneClientAdapter createAdapter(ConfigurationSource conf, + String omHost, int omPort, boolean isolatedClassloader) + throws IOException { + + if (isolatedClassloader) { + return OzoneClientAdapterFactory.createAdapter(); + } else { + return new BasicRootedOzoneClientAdapterImpl(omHost, omPort, conf); + } + } + + @Override + public void close() throws IOException { + try { + adapter.close(); + } finally { + super.close(); + } + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public String getScheme() { + return OZONE_OFS_URI_SCHEME; + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + incrementCounter(Statistic.INVOCATION_OPEN); + statistics.incrementReadOps(1); + LOG.trace("open() path: {}", path); + final String key = pathToKey(path); + return new FSDataInputStream( + new OzoneFSInputStream(adapter.readFile(key), statistics)); + } + + protected void incrementCounter(Statistic statistic) { + //don't do anything in this default implementation. + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, + short replication, long blockSize, + Progressable progress) throws IOException { + LOG.trace("create() path:{}", f); + incrementCounter(Statistic.INVOCATION_CREATE); + statistics.incrementWriteOps(1); + final String key = pathToKey(f); + return createOutputStream(key, replication, overwrite, true); + } + + @Override + public FSDataOutputStream createNonRecursive(Path path, + FsPermission permission, + EnumSet<CreateFlag> flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE); + statistics.incrementWriteOps(1); + final String key = pathToKey(path); + return createOutputStream(key, + replication, flags.contains(CreateFlag.OVERWRITE), false); + } + + private FSDataOutputStream createOutputStream(String key, short replication, + boolean overwrite, boolean recursive) throws IOException { + return new FSDataOutputStream(adapter.createFile(key, + replication, overwrite, recursive), statistics); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new UnsupportedOperationException("append() Not implemented by the " + + getClass().getSimpleName() + " FileSystem implementation"); + } + + private class RenameIterator extends OzoneListingIterator { + private final String srcPath; + private final String dstPath; + private final OzoneBucket bucket; + private final BasicRootedOzoneClientAdapterImpl adapterImpl; + + RenameIterator(Path srcPath, Path dstPath) + throws IOException { + super(srcPath); + this.srcPath = pathToKey(srcPath); + this.dstPath = pathToKey(dstPath); + LOG.trace("rename from:{} to:{}", this.srcPath, this.dstPath); + // Initialize bucket here to reduce number of RPC calls + OFSPath ofsPath = new OFSPath(srcPath); + // TODO: Refactor later. + adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter; + this.bucket = adapterImpl.getBucket(ofsPath, false); + } + + @Override + boolean processKeyPath(String keyPath) throws IOException { + String newPath = dstPath.concat(keyPath.substring(srcPath.length())); + adapterImpl.rename(this.bucket, keyPath, newPath); + return true; + } + } + + /** + * Check whether the source and destination path are valid and then perform + * rename from source path to destination path. + * <p> + * The rename operation is performed by renaming the keys with src as prefix. + * For such keys the prefix is changed from src to dst. + * + * @param src source path for rename + * @param dst destination path for rename + * @return true if rename operation succeeded or + * if the src and dst have the same path and are of the same type + * @throws IOException on I/O errors or if the src/dst paths are invalid. + */ + @Override + public boolean rename(Path src, Path dst) throws IOException { + incrementCounter(Statistic.INVOCATION_RENAME); + statistics.incrementWriteOps(1); + if (src.equals(dst)) { + return true; + } + + LOG.trace("rename() from: {} to: {}", src, dst); + if (src.isRoot()) { + // Cannot rename root of file system + LOG.trace("Cannot rename the root of a filesystem"); + return false; + } + + // src and dst should be in the same bucket + OFSPath ofsSrc = new OFSPath(src); + OFSPath ofsDst = new OFSPath(dst); + if (!ofsSrc.isInSameBucketAs(ofsDst)) { + throw new IOException("Cannot rename a key to a different bucket"); + } + + // Cannot rename a directory to its own subdirectory + Path dstParent = dst.getParent(); + while (dstParent != null && !src.equals(dstParent)) { + dstParent = dstParent.getParent(); + } + Preconditions.checkArgument(dstParent == null, + "Cannot rename a directory to its own subdirectory"); + // Check if the source exists + FileStatus srcStatus; + try { + srcStatus = getFileStatus(src); + } catch (FileNotFoundException fnfe) { + // source doesn't exist, return + return false; + } + + // Check if the destination exists + FileStatus dstStatus; + try { + dstStatus = getFileStatus(dst); + } catch (FileNotFoundException fnde) { + dstStatus = null; + } + + if (dstStatus == null) { + // If dst doesn't exist, check whether dst parent dir exists or not + // if the parent exists, the source can still be renamed to dst path + dstStatus = getFileStatus(dst.getParent()); + if (!dstStatus.isDirectory()) { + throw new IOException(String.format( + "Failed to rename %s to %s, %s is a file", src, dst, + dst.getParent())); + } + } else { + // if dst exists and source and destination are same, + // check both the src and dst are of same type + if (srcStatus.getPath().equals(dstStatus.getPath())) { + return !srcStatus.isDirectory(); + } else if (dstStatus.isDirectory()) { + // If dst is a directory, rename source as subpath of it. + // for example rename /source to /dst will lead to /dst/source + dst = new Path(dst, src.getName()); + FileStatus[] statuses; + try { + statuses = listStatus(dst); + } catch (FileNotFoundException fnde) { + statuses = null; + } + + if (statuses != null && statuses.length > 0) { + // If dst exists and not a directory not empty + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists or not empty!", + src, dst)); + } + } else { + // If dst is not a directory + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists!", src, dst)); + } + } + + if (srcStatus.isDirectory()) { + if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) { + LOG.trace("Cannot rename a directory to a subdirectory of self"); + return false; + } + } + RenameIterator iterator = new RenameIterator(src, dst); + boolean result = iterator.iterate(); + if (result) { + createFakeParentDirectory(src); + } + return result; + } + + private class DeleteIterator extends OzoneListingIterator { + final private boolean recursive; + private final OzoneBucket bucket; + private final BasicRootedOzoneClientAdapterImpl adapterImpl; + + DeleteIterator(Path f, boolean recursive) + throws IOException { + super(f); + this.recursive = recursive; + if (getStatus().isDirectory() + && !this.recursive + && listStatus(f).length != 0) { + throw new PathIsNotEmptyDirectoryException(f.toString()); + } + // Initialize bucket here to reduce number of RPC calls + OFSPath ofsPath = new OFSPath(f); + // TODO: Refactor later. + adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter; + this.bucket = adapterImpl.getBucket(ofsPath, false); + } + + @Override + boolean processKeyPath(String keyPath) { + if (keyPath.equals("")) { + LOG.trace("Skipping deleting root directory"); + return true; + } else { + LOG.trace("Deleting: {}", keyPath); + boolean succeed = adapterImpl.deleteObject(this.bucket, keyPath); + // if recursive delete is requested ignore the return value of + // deleteObject and issue deletes for other keys. + return recursive || succeed; + } + } + } + + /** + * Deletes the children of the input dir path by iterating though the + * DeleteIterator. + * + * @param f directory path to be deleted + * @return true if successfully deletes all required keys, false otherwise + * @throws IOException + */ + private boolean innerDelete(Path f, boolean recursive) throws IOException { + LOG.trace("delete() path:{} recursive:{}", f, recursive); + try { + DeleteIterator iterator = new DeleteIterator(f, recursive); + return iterator.iterate(); + } catch (FileNotFoundException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Couldn't delete {} - does not exist", f); + } + return false; + } + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + incrementCounter(Statistic.INVOCATION_DELETE); + statistics.incrementWriteOps(1); + LOG.debug("Delete path {} - recursive {}", f, recursive); + FileStatus status; + try { + status = getFileStatus(f); + } catch (FileNotFoundException ex) { + LOG.warn("delete: Path does not exist: {}", f); + return false; + } + + if (status == null) { + return false; + } + + String key = pathToKey(f); + boolean result; + + if (status.isDirectory()) { + LOG.debug("delete: Path is a directory: {}", f); + OFSPath ofsPath = new OFSPath(key); + + // Handle rm root + if (ofsPath.isRoot()) { + // Intentionally drop support for rm root + // because it is too dangerous and doesn't provide much value + LOG.warn("delete: OFS does not support rm root. " + + "To wipe the cluster, please re-init OM instead."); + return false; + } + + // Handle delete volume + if (ofsPath.isVolume()) { + String volumeName = ofsPath.getVolumeName(); + if (recursive) { + // Delete all buckets first + OzoneVolume volume = + adapterImpl.getObjectStore().getVolume(volumeName); Review comment: Yes I agree. When implementing OFS volume and bucket deletion I did realize I can't put the recursion logic entirely in adapter. Hence the hacky one. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
