http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java new file mode 100644 index 0000000..6b5c776 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -0,0 +1,580 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsParentNotDirectoryException; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; +import org.apache.ignite.igfs.IgfsPathNotFoundException; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; +import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}. + * <p> + * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}. + */ +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware, + HadoopPayloadAware { + /** The default user name. It is used if no user context is set. */ + private String dfltUsrName; + + /** Factory. */ + private HadoopFileSystemFactory fsFactory; + + /** + * Default constructor for Spring. + */ + public IgniteHadoopIgfsSecondaryFileSystem() { + // No-op. + } + + /** + * Simple constructor that is to be used by default. + * + * @param uri URI of file system. + * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. + */ + @Deprecated + public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException { + this(uri, null, null); + } + + /** + * Constructor. + * + * @param uri URI of file system. + * @param cfgPath Additional path to Hadoop configuration. + * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. + */ + @Deprecated + public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) + throws IgniteCheckedException { + this(uri, cfgPath, null); + } + + /** + * Constructor. + * + * @param uri URI of file system. + * @param cfgPath Additional path to Hadoop configuration. + * @param userName User name. + * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. + */ + @Deprecated + public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, + @Nullable String userName) throws IgniteCheckedException { + setDefaultUserName(userName); + + CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); + + fac.setUri(uri); + + if (cfgPath != null) + fac.setConfigPaths(cfgPath); + + setFileSystemFactory(fac); + } + + /** + * Gets default user name. + * <p> + * Defines user name which will be used during file system invocation in case no user name is defined explicitly + * through {@link FileSystem#get(URI, Configuration, String)}. + * <p> + * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name + * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or + * {@link IgfsUserContext#doAs(String, Callable)} methods. + * <p> + * If not set value of system property {@code "user.name"} will be used. If this property is not set either, + * {@code "anonymous"} will be used. + * + * @return Default user name. + */ + @Nullable public String getDefaultUserName() { + return dfltUsrName; + } + + /** + * Sets default user name. See {@link #getDefaultUserName()} for details. + * + * @param dfltUsrName Default user name. + */ + public void setDefaultUserName(@Nullable String dfltUsrName) { + this.dfltUsrName = dfltUsrName; + } + + /** + * Gets secondary file system factory. + * <p> + * This factory will be used whenever a call to a target {@link FileSystem} is required. + * <p> + * If not set, {@link CachingHadoopFileSystemFactory} will be used. + * + * @return Secondary file system factory. + */ + public HadoopFileSystemFactory getFileSystemFactory() { + return fsFactory; + } + + /** + * Sets secondary file system factory. See {@link #getFileSystemFactory()} for details. + * + * @param factory Secondary file system factory. + */ + public void setFileSystemFactory(HadoopFileSystemFactory factory) { + this.fsFactory = factory; + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + URI uri = fileSystemForUser().getUri(); + + return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); + } + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param detailMsg Detailed error message. + * @return Appropriate exception. + */ + private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { + return cast(detailMsg, e); + } + + /** + * Cast IO exception to IGFS exception. + * + * @param e IO exception. + * @return IGFS exception. + */ + public static IgfsException cast(String msg, IOException e) { + if (e instanceof FileNotFoundException) + return new IgfsPathNotFoundException(e); + else if (e instanceof ParentNotDirectoryException) + return new IgfsParentNotDirectoryException(msg, e); + else if (e instanceof PathIsNotEmptyDirectoryException) + return new IgfsDirectoryNotEmptyException(e); + else if (e instanceof PathExistsException) + return new IgfsPathAlreadyExistsException(msg, e); + else + return new IgfsException(msg, e); + } + + /** + * Convert Hadoop FileStatus properties to map. + * + * @param status File status. + * @return IGFS attributes. + */ + private static Map<String, String> properties(FileStatus status) { + FsPermission perm = status.getPermission(); + + if (perm == null) + perm = FsPermission.getDefault(); + + HashMap<String, String> res = new HashMap<>(3); + + res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort())); + res.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); + res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + return fileSystemForUser().exists(convert(path)); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + + final FileSystem fileSys = fileSystemForUser(); + + try { + if (props0.userName() != null || props0.groupName() != null) + fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + + if (props0.permission() != null) + fileSys.setPermission(convert(path), props0.permission()); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); + } + + //Result is not used in case of secondary FS. + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // Delegate to the secondary file system. + try { + if (!fileSystemForUser().rename(convert(src), convert(dest))) + throw new IgfsException("Failed to rename (secondary file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + try { + return fileSystemForUser().delete(convert(path), recursive); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + try { + if (!fileSystemForUser().mkdirs(convert(path))) + throw new IgniteException("Failed to make directories [path=" + path + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + try { + if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsPath> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) + res.add(new IgfsPath(path, status.getPath().getName())); + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsFile> res = new ArrayList<>(statuses.length); + + for (FileStatus s : statuses) { + IgfsEntryInfo fsInfo = s.isDirectory() ? + IgfsUtils.createDirectory( + IgniteUuid.randomUuid(), + null, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ) : + IgfsUtils.createFile( + IgniteUuid.randomUuid(), + (int)s.getBlockSize(), + s.getLen(), + null, + null, + false, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ); + + res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1)); + } + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + try { + return fileSystemForUser().create(convert(path), overwrite); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + HadoopIgfsProperties props0 = + new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); + + try { + return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short) replication, blockSize, null); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + + ", blockSize=" + blockSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + return fileSystemForUser().append(convert(path), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + try { + final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); + + if (status == null) + return null; + + final Map<String, String> props = properties(status); + + return new IgfsFile() { + @Override public IgfsPath path() { + return path; + } + + @Override public boolean isFile() { + return status.isFile(); + } + + @Override public boolean isDirectory() { + return status.isDirectory(); + } + + @Override public int blockSize() { + // By convention directory has blockSize == 0, while file has blockSize > 0: + return isDirectory() ? 0 : (int)status.getBlockSize(); + } + + @Override public long groupBlockSize() { + return status.getBlockSize(); + } + + @Override public long accessTime() { + return status.getAccessTime(); + } + + @Override public long modificationTime() { + return status.getModificationTime(); + } + + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + @Override public long length() { + return status.getLen(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + }; + } + catch (FileNotFoundException ignore) { + return null; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get used space size of file system."); + } + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + fileSystemForUser().setTimes(convert(path), modificationTime, accessTime); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed set times for path: " + path); + } + } + + /** + * Gets the underlying {@link FileSystem}. + * This method is used solely for testing. + * @return the underlying Hadoop {@link FileSystem}. + */ + public FileSystem fileSystem() { + return fileSystemForUser(); + } + + /** + * Gets the FileSystem for the current context user. + * @return the FileSystem instance, never null. + */ + private FileSystem fileSystemForUser() { + String user = IgfsUserContext.currentUser(); + + if (F.isEmpty(user)) + user = IgfsUtils.fixUserName(dfltUsrName); + + assert !F.isEmpty(user); + + try { + return fsFactory.get(user); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + dfltUsrName = IgfsUtils.fixUserName(dfltUsrName); + + if (fsFactory == null) + fsFactory = new CachingHadoopFileSystemFactory(); + + if (fsFactory instanceof LifecycleAware) + ((LifecycleAware) fsFactory).start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (fsFactory instanceof LifecycleAware) + ((LifecycleAware)fsFactory).stop(); + } + + /** {@inheritDoc} */ + @Override public HadoopFileSystemFactory getPayload() { + return fsFactory; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java new file mode 100644 index 0000000..bbfbc59 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.security.PrivilegedExceptionAction; + +/** + * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos. + * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user. + * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details. + * The principal and the key tab name to be used for Kerberos authentication are set explicitly + * in the factory configuration. + * + * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set + * to {@code true}, file system instances will be cached by Hadoop. + */ +public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** The default interval used to re-login from the key tab, in milliseconds. */ + public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L; + + /** Keytab full file name. */ + private String keyTab; + + /** Keytab principal. */ + private String keyTabPrincipal; + + /** The re-login interval. See {@link #getReloginInterval()} for more information. */ + private long reloginInterval = DFLT_RELOGIN_INTERVAL; + + /** Time of last re-login attempt, in system milliseconds. */ + private transient volatile long lastReloginTime; + + /** + * Constructor. + */ + public KerberosHadoopFileSystemFactory() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public FileSystem getWithMappedName(String name) throws IOException { + reloginIfNeeded(); + + return super.getWithMappedName(name); + } + + /** {@inheritDoc} */ + @Override protected FileSystem create(String usrName) throws IOException, InterruptedException { + UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName, + UserGroupInformation.getLoginUser()); + + return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override public FileSystem run() throws Exception { + return FileSystem.get(fullUri, cfg); + } + }); + } + + /** + * Gets the key tab principal short name (e.g. "hdfs"). + * + * @return The key tab principal. + */ + @Nullable public String getKeyTabPrincipal() { + return keyTabPrincipal; + } + + /** + * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information. + * + * @param keyTabPrincipal The key tab principal name. + */ + public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) { + this.keyTabPrincipal = keyTabPrincipal; + } + + /** + * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab"). + * <p> + * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of + * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well. + * + * @return The key tab file name. + */ + @Nullable public String getKeyTab() { + return keyTab; + } + + /** + * Sets the key tab file name. See {@link #getKeyTab()} for more information. + * + * @param keyTab The key tab file name. + */ + public void setKeyTab(@Nullable String keyTab) { + this.keyTab = keyTab; + } + + /** + * The interval used to re-login from the key tab, in milliseconds. + * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is + * because the ticket renew window starts from {@code 0.8 * ticket life time}. + * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min) + * is obeys this rule well. + * + * <p>Zero value means that re-login should be attempted on each file system operation. + * Negative values are not allowed. + * + * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to + * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds + * have passed since the time of the previous login. + * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for + * more detail. + * + * @return The re-login interval, in milliseconds. + */ + public long getReloginInterval() { + return reloginInterval; + } + + /** + * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information. + * + * @param reloginInterval The re-login interval, in milliseconds. + */ + public void setReloginInterval(long reloginInterval) { + this.reloginInterval = reloginInterval; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty."); + A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty."); + A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative."); + + super.start(); + + try { + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab); + } + catch (IOException ioe) { + throw new IgniteException("Failed login from keytab [keyTab=" + keyTab + + ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe); + } + } + + /** + * Re-logins the user if needed. + * First, the re-login interval defined in factory is checked. The re-login attempts will be not more + * frequent than one attempt per {@code reloginInterval}. + * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing + * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login. + * + * <p>This operation expected to be called upon each operation with the file system created with the factory. + * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there + * is no need to invoke it otherwise specially. + * + * @throws IOException If login fails. + */ + private void reloginIfNeeded() throws IOException { + long now = System.currentTimeMillis(); + + if (now >= lastReloginTime + reloginInterval) { + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + + lastReloginTime = now; + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeString(out, keyTab); + U.writeString(out, keyTabPrincipal); + out.writeLong(reloginInterval); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + keyTab = U.readString(in); + keyTabPrincipal = U.readString(in); + reloginInterval = in.readLong(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java new file mode 100644 index 0000000..164801f --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Ignite Hadoop Accelerator file system API. + */ +package org.apache.ignite.hadoop.fs; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java new file mode 100644 index 0000000..a06129e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -0,0 +1,1364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs.v1; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Progressable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; +import org.apache.ignite.internal.processors.igfs.IgfsPaths; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; +import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR; +import static org.apache.ignite.igfs.IgfsMode.PROXY; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; +import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME; + +/** + * {@code IGFS} Hadoop 1.x file system driver over file system API. To use + * {@code IGFS} as Hadoop file system, you should configure this class + * in Hadoop's {@code core-site.xml} as follows: + * <pre name="code" class="xml"> + * <property> + * <name>fs.default.name</name> + * <value>igfs:///</value> + * </property> + * + * <property> + * <name>fs.igfs.impl</name> + * <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> + * </property> + * </pre> + * You should also add Ignite JAR and all libraries to Hadoop classpath. To + * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop + * distribution: + * <pre name="code" class="bash"> + * export IGNITE_HOME=/path/to/Ignite/distribution + * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar + * + * for f in $IGNITE_HOME/libs/*.jar; do + * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f; + * done + * </pre> + * <h1 class="header">Data vs Clients Nodes</h1> + * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on + * data nodes. Client nodes are responsible for basic file system operations as well as + * accessing data nodes remotely. Usually, client nodes are started together + * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually + * started together with Hadoop {@code task-tracker} processes. + * <p> + * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} + * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. + */ +public class IgniteHadoopFileSystem extends FileSystem { + /** Internal property to indicate management connection. */ + public static final String IGFS_MANAGEMENT = "fs.igfs.management.connection"; + + /** Empty array of file block locations. */ + private static final BlockLocation[] EMPTY_BLOCK_LOCATIONS = new BlockLocation[0]; + + /** Empty array of file statuses. */ + public static final FileStatus[] EMPTY_FILE_STATUS = new FileStatus[0]; + + /** Ensures that close routine is invoked at most once. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Grid remote client. */ + private HadoopIgfsWrapper rmtClient; + + /** working directory. */ + private Path workingDir; + + /** Default replication factor. */ + private short dfltReplication; + + /** Base file system uri. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private URI uri; + + /** Authority. */ + private String uriAuthority; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Secondary URI string. */ + private URI secondaryUri; + + /** The user name this file system was created on behalf of. */ + private String user; + + /** IGFS mode resolver. */ + private IgfsModeResolver modeRslvr; + + /** The secondary file system factory. */ + private HadoopFileSystemFactory factory; + + /** Management connection flag. */ + private boolean mgmt; + + /** Whether custom sequential reads before prefetch value is provided. */ + private boolean seqReadsBeforePrefetchOverride; + + /** IGFS group block size. */ + private long igfsGrpBlockSize; + + /** Flag that controls whether file writes should be colocated. */ + private boolean colocateFileWrites; + + /** Prefer local writes. */ + private boolean preferLocFileWrites; + + /** Custom-provided sequential reads before prefetch. */ + private int seqReadsBeforePrefetch; + + /** {@inheritDoc} */ + @Override public URI getUri() { + if (uri == null) + throw new IllegalStateException("URI is null (was IgniteHadoopFileSystem properly initialized?)."); + + return uri; + } + + /** + * Enter busy state. + * + * @throws IOException If file system is stopped. + */ + private void enterBusy() throws IOException { + if (closeGuard.get()) + throw new IOException("File system is stopped."); + } + + /** + * Leave busy state. + */ + private void leaveBusy() { + // No-op. + } + + /** + * Gets non-null user name as per the Hadoop file system viewpoint. + * @return the user name, never null. + */ + public static String getFsHadoopUser() throws IOException { + UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); + + String user = currUgi.getShortUserName(); + + user = IgfsUtils.fixUserName(user); + + assert user != null; + + return user; + } + + /** + * Public setter that can be used by direct users of FS or Visor. + * + * @param colocateFileWrites Whether all ongoing file writes should be colocated. + */ + @SuppressWarnings("UnusedDeclaration") + public void colocateFileWrites(boolean colocateFileWrites) { + this.colocateFileWrites = colocateFileWrites; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public void initialize(URI name, Configuration cfg) throws IOException { + enterBusy(); + + try { + if (rmtClient != null) + throw new IOException("File system is already initialized: " + rmtClient); + + A.notNull(name, "name"); + A.notNull(cfg, "cfg"); + + super.initialize(name, cfg); + + setConf(cfg); + + mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false); + + if (!IGFS_SCHEME.equals(name.getScheme())) + throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + + "://[name]/[optional_path], actual=" + name + ']'); + + uri = name; + + uriAuthority = uri.getAuthority(); + + user = getFsHadoopUser(); + + // Override sequential reads before prefetch if needed. + seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); + + if (seqReadsBeforePrefetch > 0) + seqReadsBeforePrefetchOverride = true; + + // In Ignite replication factor is controlled by data cache affinity. + // We use replication factor to force the whole file to be stored on local node. + dfltReplication = (short)cfg.getInt("dfs.replication", 3); + + // Get file colocation control flag. + colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); + preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); + + // Get log directory. + String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); + + File logDirFile = U.resolveIgnitePath(logDirCfg); + + String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; + + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); + + // Handshake. + IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); + + igfsGrpBlockSize = handshake.blockSize(); + + IgfsPaths paths = handshake.secondaryPaths(); + + // Initialize client logger. + Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); + + if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { + // Initiate client logger. + if (logDir == null) + throw new IOException("Failed to resolve log directory: " + logDirCfg); + + Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); + + clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); + } + else + clientLog = IgfsLogger.disabledLogger(); + + try { + modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); + } + catch (IgniteCheckedException ice) { + throw new IOException(ice); + } + + boolean initSecondary = paths.defaultMode() == PROXY; + + if (!initSecondary && paths.pathModes() != null && !paths.pathModes().isEmpty()) { + for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { + IgfsMode mode = pathMode.getValue(); + + if (mode == PROXY) { + initSecondary = true; + + break; + } + } + } + + if (initSecondary) { + try { + factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to get secondary file system factory.", e); + } + + if (factory == null) + throw new IOException("Failed to get secondary file system factory (did you set " + + IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " + + FileSystemConfiguration.class.getName() + "?)"); + + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).start(); + + try { + FileSystem secFs = factory.get(user); + + secondaryUri = secFs.getUri(); + + A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); + } + catch (IOException e) { + if (!mgmt) + throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); + else + LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " + + "will have no effect): " + e.getMessage()); + } + } + + // set working directory to the home directory of the current Fs user: + setWorkingDirectory(null); + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override protected void checkPath(Path path) { + URI uri = path.toUri(); + + if (uri.isAbsolute()) { + if (!F.eq(uri.getScheme(), IGFS_SCHEME)) + throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + + uri.getAuthority() + ']'); + + if (!F.eq(uri.getAuthority(), uriAuthority)) + throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + + uri.getAuthority() + ']'); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public short getDefaultReplication() { + return dfltReplication; + } + + /** {@inheritDoc} */ + @Override protected void finalize() throws Throwable { + super.finalize(); + + close(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (closeGuard.compareAndSet(false, true)) + close0(); + } + + /** + * Closes file system. + * + * @throws IOException If failed. + */ + private void close0() throws IOException { + if (LOG.isDebugEnabled()) + LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); + + if (rmtClient == null) + return; + + super.close(); + + rmtClient.close(false); + + if (clientLog.isLogEnabled()) + clientLog.close(); + + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).stop(); + + // Reset initialized resources. + uri = null; + rmtClient = null; + } + + /** {@inheritDoc} */ + @Override public void setTimes(Path p, long mtime, long atime) throws IOException { + enterBusy(); + + try { + A.notNull(p, "p"); + + if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + // No-op for management connection. + return; + } + + secondaryFs.setTimes(toSecondary(p), mtime, atime); + } + else { + IgfsPath path = convert(p); + + rmtClient.setTimes(path, atime, mtime); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void setPermission(Path p, FsPermission perm) throws IOException { + enterBusy(); + + try { + A.notNull(p, "p"); + + if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + // No-op for management connection. + return; + } + + secondaryFs.setPermission(toSecondary(p), perm); + } + else if (rmtClient.update(convert(p), permission(perm)) == null) { + throw new IOException("Failed to set file permission (file not found?)" + + " [path=" + p + ", perm=" + perm + ']'); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void setOwner(Path p, String username, String grpName) throws IOException { + A.notNull(p, "p"); + A.notNull(username, "username"); + A.notNull(grpName, "grpName"); + + enterBusy(); + + try { + if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + // No-op for management connection. + return; + } + + secondaryFs.setOwner(toSecondary(p), username, grpName); + } + else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, username, + IgfsUtils.PROP_GROUP_NAME, grpName)) == null) { + throw new IOException("Failed to set file permission (file not found?)" + + " [path=" + p + ", userName=" + username + ", groupName=" + grpName + ']'); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = mode(path); + + if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + throw new IOException("Failed to open file (secondary file system is not initialized): " + f); + } + + FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize); + + if (clientLog.isLogEnabled()) { + // At this point we do not know file size, so we perform additional request to remote FS to get it. + FileStatus status = secondaryFs.getFileStatus(toSecondary(f)); + + long size = status != null ? status.getLen() : -1; + + long logId = IgfsLogger.nextId(); + + clientLog.logOpen(logId, path, PROXY, bufSize, size); + + return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId)); + } + else + return is; + } + else { + HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? + rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); + + long logId = -1; + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logOpen(logId, path, mode, bufSize, stream.length()); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + + ", bufSize=" + bufSize + ']'); + + HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), + bufSize, LOG, clientLog, logId); + + if (LOG.isDebugEnabled()) + LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); + + return new FSDataInputStream(igfsIn); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public FSDataOutputStream create(Path f, final FsPermission perm, boolean overwrite, int bufSize, + short replication, long blockSize, Progressable progress) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + OutputStream out = null; + + try { + IgfsPath path = convert(f); + IgfsMode mode = mode(path); + + if (LOG.isDebugEnabled()) + LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + + path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); + + if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + throw new IOException("Failed to create file (secondary file system is not initialized): " + f); + } + + FSDataOutputStream os = + secondaryFs.create(toSecondary(f), perm, overwrite, bufSize, replication, blockSize, progress); + + if (clientLog.isLogEnabled()) { + long logId = IgfsLogger.nextId(); + + clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); + + return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); + } + else + return os; + } + else { + Map<String,String> propMap = permission(perm); + + propMap.put(IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); + + // Create stream and close it in the 'finally' section if any sequential operation failed. + HadoopIgfsStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites, + replication, blockSize, propMap); + + assert stream != null; + + long logId = -1; + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); + + HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, clientLog, + logId); + + bufSize = Math.max(64 * 1024, bufSize); + + out = new BufferedOutputStream(igfsOut, bufSize); + + FSDataOutputStream res = new FSDataOutputStream(out, null, 0); + + // Mark stream created successfully. + out = null; + + return res; + } + } + finally { + // Close if failed during stream creation. + if (out != null) + U.closeQuiet(out); + + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = mode(path); + + if (LOG.isDebugEnabled()) + LOG.debug("Opening output stream in append [thread=" + Thread.currentThread().getName() + + ", path=" + path + ", bufSize=" + bufSize + ']'); + + if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + throw new IOException("Failed to append file (secondary file system is not initialized): " + f); + } + + FSDataOutputStream os = secondaryFs.append(toSecondary(f), bufSize, progress); + + if (clientLog.isLogEnabled()) { + long logId = IgfsLogger.nextId(); + + clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. + + return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); + } + else + return os; + } + else { + HadoopIgfsStreamDelegate stream = rmtClient.append(path, false, null); + + assert stream != null; + + long logId = -1; + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logAppend(logId, path, mode, bufSize); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); + + HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, clientLog, + logId); + + bufSize = Math.max(64 * 1024, bufSize); + + BufferedOutputStream out = new BufferedOutputStream(igfsOut, bufSize); + + return new FSDataOutputStream(out, null, 0); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean rename(Path src, Path dst) throws IOException { + A.notNull(src, "src"); + A.notNull(dst, "dst"); + + enterBusy(); + + try { + IgfsPath srcPath = convert(src); + IgfsPath dstPath = convert(dst); + IgfsMode mode = mode(srcPath); + + if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + return false; + } + + if (clientLog.isLogEnabled()) + clientLog.logRename(srcPath, PROXY, dstPath); + + return secondaryFs.rename(toSecondary(src), toSecondary(dst)); + } + else { + if (clientLog.isLogEnabled()) + clientLog.logRename(srcPath, mode, dstPath); + + try { + rmtClient.rename(srcPath, dstPath); + } + catch (IOException ioe) { + // Log the exception before rethrowing since it may be ignored: + LOG.warn("Failed to rename [srcPath=" + srcPath + ", dstPath=" + dstPath + ", mode=" + mode + ']', + ioe); + + throw ioe; + } + + return true; + } + } + catch (IOException e) { + // Intentionally ignore IGFS exceptions here to follow Hadoop contract. + if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || + !X.hasCause(e.getCause(), IgfsException.class))) + throw e; + else + return false; + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean delete(Path f) throws IOException { + return delete(f, false); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean delete(Path f, boolean recursive) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = mode(path); + + if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + return false; + } + + if (clientLog.isLogEnabled()) + clientLog.logDelete(path, PROXY, recursive); + + return secondaryFs.delete(toSecondary(f), recursive); + } + else { + // Will throw exception if delete failed. + boolean res = rmtClient.delete(path, recursive); + + if (clientLog.isLogEnabled()) + clientLog.logDelete(path, mode, recursive); + + return res; + } + } + catch (IOException e) { + // Intentionally ignore IGFS exceptions here to follow Hadoop contract. + if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || + !X.hasCause(e.getCause(), IgfsException.class))) + throw e; + else + return false; + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = mode(path); + + if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + return EMPTY_FILE_STATUS; + } + + FileStatus[] arr = secondaryFs.listStatus(toSecondary(f)); + + if (arr == null) + throw new FileNotFoundException("File " + f + " does not exist."); + + for (int i = 0; i < arr.length; i++) + arr[i] = toPrimary(arr[i]); + + if (clientLog.isLogEnabled()) { + String[] fileArr = new String[arr.length]; + + for (int i = 0; i < arr.length; i++) + fileArr[i] = arr[i].getPath().toString(); + + clientLog.logListDirectory(path, PROXY, fileArr); + } + + return arr; + } + else { + Collection<IgfsFile> list = rmtClient.listFiles(path); + + if (list == null) + throw new FileNotFoundException("File " + f + " does not exist."); + + List<IgfsFile> files = new ArrayList<>(list); + + FileStatus[] arr = new FileStatus[files.size()]; + + for (int i = 0; i < arr.length; i++) + arr[i] = convert(files.get(i)); + + if (clientLog.isLogEnabled()) { + String[] fileArr = new String[arr.length]; + + for (int i = 0; i < arr.length; i++) + fileArr[i] = arr[i].getPath().toString(); + + clientLog.logListDirectory(path, mode, fileArr); + } + + return arr; + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public Path getHomeDirectory() { + Path path = new Path("/user/" + user); + + return path.makeQualified(getUri(), null); + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path newPath) { + try { + if (newPath == null) { + Path homeDir = getHomeDirectory(); + + FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs != null) + secondaryFs.setWorkingDirectory(toSecondary(homeDir)); + + workingDir = homeDir; + } + else { + Path fixedNewPath = fixRelativePart(newPath); + + String res = fixedNewPath.toUri().getPath(); + + if (!DFSUtil.isValidName(res)) + throw new IllegalArgumentException("Invalid DFS directory name " + res); + + FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs != null) + secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); + + workingDir = fixedNewPath; + } + } + catch (IOException e) { + throw new RuntimeException("Failed to obtain secondary file system instance.", e); + } + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return workingDir; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean mkdirs(Path f, FsPermission perm) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = mode(path); + + if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + return false; + } + + if (clientLog.isLogEnabled()) + clientLog.logMakeDirectory(path, PROXY); + + return secondaryFs.mkdirs(toSecondary(f), perm); + } + else { + boolean mkdirRes = rmtClient.mkdirs(path, permission(perm)); + + if (clientLog.isLogEnabled()) + clientLog.logMakeDirectory(path, mode); + + return mkdirRes; + } + } + catch (IOException e) { + // Intentionally ignore IGFS exceptions here to follow Hadoop contract. + if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || + !X.hasCause(e.getCause(), IgfsException.class))) + throw e; + else + return false; + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + if (mode(f) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + throw new IOException("Failed to get file status (secondary file system is not initialized): " + f); + } + + return toPrimary(secondaryFs.getFileStatus(toSecondary(f))); + } + else { + IgfsFile info = rmtClient.info(convert(f)); + + if (info == null) + throw new FileNotFoundException("File not found: " + f); + + return convert(info); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public ContentSummary getContentSummary(Path f) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + if (mode(f) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + throw new IOException("Failed to get content summary (secondary file system is not initialized): " + + f); + } + + return secondaryFs.getContentSummary(toSecondary(f)); + } + else { + IgfsPathSummary sum = rmtClient.contentSummary(convert(f)); + + return new ContentSummary(sum.totalLength(), sum.filesCount(), sum.directoriesCount(), + -1, sum.totalLength(), rmtClient.fsStatus().spaceTotal()); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public BlockLocation[] getFileBlockLocations(FileStatus status, long start, long len) throws IOException { + A.notNull(status, "status"); + + enterBusy(); + + try { + IgfsPath path = convert(status.getPath()); + + if (mode(status.getPath()) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + + if (secondaryFs == null) { + assert mgmt; + + return EMPTY_BLOCK_LOCATIONS; + } + + Path secPath = toSecondary(status.getPath()); + + return secondaryFs.getFileBlockLocations(secondaryFs.getFileStatus(secPath), start, len); + } + else { + long now = System.currentTimeMillis(); + + List<IgfsBlockLocation> affinity = new ArrayList<>(rmtClient.affinity(path, start, len)); + + BlockLocation[] arr = new BlockLocation[affinity.size()]; + + for (int i = 0; i < arr.length; i++) + arr[i] = convert(affinity.get(i)); + + if (LOG.isDebugEnabled()) + LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + + (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); + + return arr; + } + } + catch (FileNotFoundException ignored) { + return EMPTY_BLOCK_LOCATIONS; + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public long getDefaultBlockSize() { + return igfsGrpBlockSize; + } + + /** + * Resolve path mode. + * + * @param path HDFS path. + * @return Path mode. + */ + public IgfsMode mode(Path path) { + return mode(convert(path)); + } + + /** + * Resolve path mode. + * + * @param path IGFS path. + * @return Path mode. + */ + public IgfsMode mode(IgfsPath path) { + return modeRslvr.resolveMode(path); + } + + /** + * @return {@code true} If secondary file system is initialized. + */ + public boolean hasSecondaryFileSystem() { + return factory != null; + } + + /** + * Convert the given path to path acceptable by the primary file system. + * + * @param path Path. + * @return Primary file system path. + */ + private Path toPrimary(Path path) { + return convertPath(path, uri); + } + + /** + * Convert the given path to path acceptable by the secondary file system. + * + * @param path Path. + * @return Secondary file system path. + */ + private Path toSecondary(Path path) { + assert factory != null; + assert secondaryUri != null; + + return convertPath(path, secondaryUri); + } + + /** + * Convert path using the given new URI. + * + * @param path Old path. + * @param newUri New URI. + * @return New path. + */ + private Path convertPath(Path path, URI newUri) { + assert newUri != null; + + if (path != null) { + URI pathUri = path.toUri(); + + try { + return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, + pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); + } + catch (URISyntaxException e) { + throw new IgniteException("Failed to construct secondary file system path from the primary file " + + "system path: " + path, e); + } + } + else + return null; + } + + /** + * Convert a file status obtained from the secondary file system to a status of the primary file system. + * + * @param status Secondary file system status. + * @return Primary file system status. + */ + @SuppressWarnings("deprecation") + private FileStatus toPrimary(FileStatus status) { + return status != null ? new FileStatus(status.getLen(), status.isDir(), status.getReplication(), + status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), + status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + return new Path(IGFS_SCHEME, uriAuthority, path.toString()); + } + + /** + * Convert Hadoop path into IGFS path. + * + * @param path Hadoop path. + * @return IGFS path. + */ + @Nullable private IgfsPath convert(@Nullable Path path) { + if (path == null) + return null; + + return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : + new IgfsPath(convert(workingDir), path.toUri().getPath()); + } + + /** + * Convert IGFS affinity block location into Hadoop affinity block location. + * + * @param block IGFS affinity block location. + * @return Hadoop affinity block location. + */ + private BlockLocation convert(IgfsBlockLocation block) { + Collection<String> names = block.names(); + Collection<String> hosts = block.hosts(); + + return new BlockLocation( + names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, + hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, + block.start(), block.length() + ) { + @Override public String toString() { + try { + return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + + ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + + /** + * Convert IGFS file information into Hadoop file status. + * + * @param file IGFS file information. + * @return Hadoop file status. + */ + @SuppressWarnings("deprecation") + private FileStatus convert(IgfsFile file) { + return new FileStatus( + file.length(), + file.isDirectory(), + getDefaultReplication(), + file.groupBlockSize(), + file.modificationTime(), + file.accessTime(), + permission(file), + file.property(IgfsUtils.PROP_USER_NAME, user), + file.property(IgfsUtils.PROP_GROUP_NAME, "users"), + convert(file.path())) { + @Override public String toString() { + return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() + + ", mtime=" + getModificationTime() + ", atime=" + getAccessTime() + ']'; + } + }; + } + + /** + * Convert Hadoop permission into IGFS file attribute. + * + * @param perm Hadoop permission. + * @return IGFS attributes. + */ + private Map<String, String> permission(FsPermission perm) { + if (perm == null) + perm = FsPermission.getDefault(); + + return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm)); + } + + /** + * @param perm Permission. + * @return String. + */ + private static String toString(FsPermission perm) { + return String.format("%04o", perm.toShort()); + } + + /** + * Convert IGFS file attributes into Hadoop permission. + * + * @param file File info. + * @return Hadoop permission. + */ + private FsPermission permission(IgfsFile file) { + String perm = file.property(IgfsUtils.PROP_PERMISSION, null); + + if (perm == null) + return FsPermission.getDefault(); + + try { + return new FsPermission((short)Integer.parseInt(perm, 8)); + } + catch (NumberFormatException ignore) { + return FsPermission.getDefault(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteHadoopFileSystem.class, this); + } + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } + + /** + * Gets cached or creates a {@link FileSystem}. + * + * @return The secondary file system. + */ + private @Nullable FileSystem secondaryFileSystem() throws IOException{ + if (factory == null) + return null; + + return factory.get(user); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java new file mode 100644 index 0000000..60e62ca --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains Ignite Hadoop 1.x <code>FileSystem</code> implementation. + */ +package org.apache.ignite.hadoop.fs.v1; \ No newline at end of file