http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java new file mode 100644 index 0000000..155ecb1 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +public class FileUtils { + private static final PathFilter SNAPSHOT_DIR_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + return ".snapshot".equalsIgnoreCase(p.getName()); + } + }; + private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class); + + public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + /** + * Filter that filters out hidden files + */ + private static final PathFilter hiddenFileFilter = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + /** + * Move a particular file or directory to the trash. + * @param fs FileSystem to use + * @param f path of file or directory to move to trash. + * @param conf configuration object + * @return true if move successful + * @throws IOException + */ + public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean purge) + throws IOException { + LOG.debug("deleting " + f); + boolean result; + try { + if(purge) { + LOG.debug("purge is set to true. Not moving to Trash " + f); + } else { + result = Trash.moveToAppropriateTrash(fs, f, conf); + if (result) { + LOG.trace("Moved to trash: " + f); + return true; + } + } + } catch (IOException ioe) { + // for whatever failure reason including that trash has lower encryption zone + // retry with force delete + LOG.warn(ioe.getMessage() + "; Force to delete it."); + } + + result = fs.delete(f, true); + if (!result) { + LOG.error("Failed to delete " + f); + } + return result; + } + + /** + * Copies files between filesystems. + */ + public static boolean copy(FileSystem srcFS, Path src, + FileSystem dstFS, Path dst, + boolean deleteSource, + boolean overwrite, + Configuration conf) throws IOException { + boolean copied = false; + boolean triedDistcp = false; + + /* Run distcp if source file/dir is too big */ + if (srcFS.getUri().getScheme().equals("hdfs")) { + ContentSummary srcContentSummary = srcFS.getContentSummary(src); + if (srcContentSummary.getFileCount() > + MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES) + && srcContentSummary.getLength() > + MetastoreConf.getLongVar(conf,ConfVars.REPL_COPYFILE_MAXSIZE)) { + + LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: " + + MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXSIZE) + ")"); + LOG.info("Source is " + srcContentSummary.getFileCount() + " files. (MAX: " + + MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES) + ")"); + LOG.info("Launch distributed copy (distcp) job."); + triedDistcp = true; + copied = distCp(srcFS, Collections.singletonList(src), dst, deleteSource, null, conf); + } + } + if (!triedDistcp) { + // Note : Currently, this implementation does not "fall back" to regular copy if distcp + // is tried and it fails. We depend upon that behaviour in cases like replication, + // wherein if distcp fails, there is good reason to not plod along with a trivial + // implementation, and fail instead. + copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf); + } + return copied; + } + + private static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst, + boolean deleteSource, String doAsUser, + Configuration conf) throws IOException { + boolean copied; + if (doAsUser == null){ + copied = HdfsUtils.runDistCp(srcPaths, dst, conf); + } else { + copied = HdfsUtils.runDistCpAs(srcPaths, dst, conf, doAsUser); + } + if (copied && deleteSource) { + for (Path path : srcPaths) { + srcFS.delete(path, true); + } + } + return copied; + } + + /** + * Creates the directory and all necessary parent directories. + * @param fs FileSystem to use + * @param f path to create. + * @return true if directory created successfully. False otherwise, including if it exists. + * @throws IOException exception in creating the directory + */ + public static boolean mkdir(FileSystem fs, Path f) throws IOException { + LOG.info("Creating directory if it doesn't exist: " + f); + return fs.mkdirs(f); + } + + /** + * Rename a file. Unlike {@link FileSystem#rename(Path, Path)}, if the destPath already exists + * and is a directory, this will NOT move the sourcePath into it. It will throw an IOException + * instead. + * @param srcFs file system src paths are on + * @param destFs file system dest paths are on + * @param srcPath source file or directory to move + * @param destPath destination file name. This must be a file and not an existing directory. + * @return result of fs.rename. + * @throws IOException if fs.rename throws it, or if destPath already exists. + */ + public static boolean rename(FileSystem srcFs, FileSystem destFs, Path srcPath, + Path destPath) throws IOException { + LOG.info("Renaming " + srcPath + " to " + destPath); + + // If destPath directory exists, rename call will move the srcPath + // into destPath without failing. So check it before renaming. + if(destFs.exists(destPath)) { + throw new IOException("Cannot rename the source path. The destination " + + "path already exists."); + } + + if (equalsFileSystem(srcFs, destFs)) { + //just rename the directory + return srcFs.rename(srcPath, destPath); + } else { + Configuration conf = new Configuration(); + return copy(srcFs, srcPath, destFs, destPath, + true, // delete source + false, // overwrite destination + conf); + } + } + + // NOTE: This is for generating the internal path name for partitions. Users + // should always use the MetaStore API to get the path name for a partition. + // Users should not directly take partition values and turn it into a path + // name by themselves, because the logic below may change in the future. + // + // In the future, it's OK to add new chars to the escape list, and old data + // won't be corrupt, because the full path name in metastore is stored. + // In that case, Hive will continue to read the old data, but when it creates + // new partitions, it will use new names. + // edit : There are some use cases for which adding new chars does not seem + // to be backward compatible - Eg. if partition was created with name having + // a special char that you want to start escaping, and then you try dropping + // the partition with a hive version that now escapes the special char using + // the list below, then the drop partition fails to work. + + private static BitSet charToEscape = new BitSet(128); + static { + for (char c = 0; c < ' '; c++) { + charToEscape.set(c); + } + + /* + * ASCII 01-1F are HTTP control characters that need to be escaped. + * \u000A and \u000D are \n and \r, respectively. + */ + char[] clist = new char[] {'\u0001', '\u0002', '\u0003', '\u0004', + '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', '\n', '\u000B', + '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', + '\u0013', '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', + '\u001A', '\u001B', '\u001C', '\u001D', '\u001E', '\u001F', + '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{', + '[', ']', '^'}; + + for (char c : clist) { + charToEscape.set(c); + } + } + + private static boolean needsEscaping(char c) { + return c >= 0 && c < charToEscape.size() && charToEscape.get(c); + } + + public static String escapePathName(String path) { + return escapePathName(path, null); + } + + /** + * Escapes a path name. + * @param path The path to escape. + * @param defaultPath + * The default name for the path, if the given path is empty or null. + * @return An escaped path name. + */ + public static String escapePathName(String path, String defaultPath) { + + // __HIVE_DEFAULT_NULL__ is the system default value for null and empty string. + // TODO: we should allow user to specify default partition or HDFS file location. + if (path == null || path.length() == 0) { + if (defaultPath == null) { + //previously, when path is empty or null and no default path is specified, + // __HIVE_DEFAULT_PARTITION__ was the return value for escapePathName + return "__HIVE_DEFAULT_PARTITION__"; + } else { + return defaultPath; + } + } + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < path.length(); i++) { + char c = path.charAt(i); + if (needsEscaping(c)) { + sb.append('%'); + sb.append(String.format("%1$02X", (int) c)); + } else { + sb.append(c); + } + } + return sb.toString(); + } + + public static String unescapePathName(String path) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < path.length(); i++) { + char c = path.charAt(i); + if (c == '%' && i + 2 < path.length()) { + int code = -1; + try { + code = Integer.parseInt(path.substring(i + 1, i + 3), 16); + } catch (Exception e) { + code = -1; + } + if (code >= 0) { + sb.append((char) code); + i += 2; + continue; + } + } + sb.append(c); + } + return sb.toString(); + } + + /** + * Get all file status from a root path and recursively go deep into certain levels. + * + * @param path + * the root path + * @param level + * the depth of directory to explore + * @param fs + * the file system + * @return array of FileStatus + * @throws IOException + */ + public static List<FileStatus> getFileStatusRecurse(Path path, int level, FileSystem fs) + throws IOException { + + // if level is <0, the return all files/directories under the specified path + if (level < 0) { + List<FileStatus> result = new ArrayList<>(); + try { + FileStatus fileStatus = fs.getFileStatus(path); + FileUtils.listStatusRecursively(fs, fileStatus, result); + } catch (IOException e) { + // globStatus() API returns empty FileStatus[] when the specified path + // does not exist. But getFileStatus() throw IOException. To mimic the + // similar behavior we will return empty array on exception. For external + // tables, the path of the table will not exists during table creation + return new ArrayList<>(0); + } + return result; + } + + // construct a path pattern (e.g., /*/*) to find all dynamically generated paths + StringBuilder sb = new StringBuilder(path.toUri().getPath()); + for (int i = 0; i < level; i++) { + sb.append(Path.SEPARATOR).append("*"); + } + Path pathPattern = new Path(path, sb.toString()); + return Lists.newArrayList(fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER)); + } + + /** + * Recursively lists status for all files starting from a particular directory (or individual file + * as base case). + * + * @param fs + * file system + * + * @param fileStatus + * starting point in file system + * + * @param results + * receives enumeration of all files found + */ + public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus, + List<FileStatus> results) throws IOException { + + if (fileStatus.isDir()) { + for (FileStatus stat : fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_PATH_FILTER)) { + listStatusRecursively(fs, stat, results); + } + } else { + results.add(fileStatus); + } + } + + public static String makePartName(List<String> partCols, List<String> vals) { + return makePartName(partCols, vals, null); + } + + /** + * Makes a valid partition name. + * @param partCols The partition keys' names + * @param vals The partition values + * @param defaultStr + * The default name given to a partition value if the respective value is empty or null. + * @return An escaped, valid partition name. + */ + public static String makePartName(List<String> partCols, List<String> vals, + String defaultStr) { + StringBuilder name = new StringBuilder(); + for (int i = 0; i < partCols.size(); i++) { + if (i > 0) { + name.append(Path.SEPARATOR); + } + name.append(escapePathName((partCols.get(i)).toLowerCase(), defaultStr)); + name.append('='); + name.append(escapePathName(vals.get(i), defaultStr)); + } + return name.toString(); + } + + /** + * Determine if two objects reference the same file system. + * @param fs1 first file system + * @param fs2 second file system + * @return return true if both file system arguments point to same file system + */ + public static boolean equalsFileSystem(FileSystem fs1, FileSystem fs2) { + //When file system cache is disabled, you get different FileSystem objects + // for same file system, so '==' can't be used in such cases + //FileSystem api doesn't have a .equals() function implemented, so using + //the uri for comparison. FileSystem already uses uri+Configuration for + //equality in its CACHE . + //Once equality has been added in HDFS-9159, we should make use of it + return fs1.getUri().equals(fs2.getUri()); + } + + /** + * Check if the path contains a subdirectory named '.snapshot' + * @param p path to check + * @param fs filesystem of the path + * @return true if p contains a subdirectory named '.snapshot' + * @throws IOException + */ + public static boolean pathHasSnapshotSubDir(Path p, FileSystem fs) throws IOException { + // Hadoop is missing a public API to check for snapshotable directories. Check with the directory name + // until a more appropriate API is provided by HDFS-12257. + final FileStatus[] statuses = fs.listStatus(p, FileUtils.SNAPSHOT_DIR_PATH_FILTER); + return statuses != null && statuses.length != 0; + } + + public static void makeDir(Path path, Configuration conf) throws MetaException { + FileSystem fs; + try { + fs = path.getFileSystem(conf); + if (!fs.exists(path)) { + fs.mkdirs(path); + } + } catch (IOException e) { + throw new MetaException("Unable to : " + path); + } + } + + /** + * Utility method that determines if a specified directory already has + * contents (non-hidden files) or not - useful to determine if an + * immutable table already has contents, for example. + * @param fs + * @param path + * @throws IOException + */ + public static boolean isDirEmpty(FileSystem fs, Path path) throws IOException { + + if (fs.exists(path)) { + FileStatus[] status = fs.globStatus(new Path(path, "*"), hiddenFileFilter); + if (status.length > 0) { + return false; + } + } + return true; + } + + /** + * Variant of Path.makeQualified that qualifies the input path against the default file system + * indicated by the configuration + * + * This does not require a FileSystem handle in most cases - only requires the Filesystem URI. + * This saves the cost of opening the Filesystem - which can involve RPCs - as well as cause + * errors + * + * @param path + * path to be fully qualified + * @param conf + * Configuration file + * @return path qualified relative to default file system + */ + public static Path makeQualified(Path path, Configuration conf) throws IOException { + + if (!path.isAbsolute()) { + // in this case we need to get the working directory + // and this requires a FileSystem handle. So revert to + // original method. + FileSystem fs = FileSystem.get(conf); + return path.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + URI fsUri = FileSystem.getDefaultUri(conf); + URI pathUri = path.toUri(); + + String scheme = pathUri.getScheme(); + String authority = pathUri.getAuthority(); + + // validate/fill-in scheme and authority. this follows logic + // identical to FileSystem.get(URI, conf) - but doesn't actually + // obtain a file system handle + + if (scheme == null) { + // no scheme - use default file system uri + scheme = fsUri.getScheme(); + authority = fsUri.getAuthority(); + if (authority == null) { + authority = ""; + } + } else { + if (authority == null) { + // no authority - use default one if it applies + if (scheme.equals(fsUri.getScheme()) && fsUri.getAuthority() != null) { + authority = fsUri.getAuthority(); + } else { + authority = ""; + } + } + } + + return new Path(scheme, authority, pathUri.getPath()); + } + + /** + * Returns a BEST GUESS as to whether or not other is a subdirectory of parent. It does not + * take into account any intricacies of the underlying file system, which is assumed to be + * HDFS. This should not return any false positives, but may return false negatives. + * + * @param parent + * @param other Directory to check if it is a subdirectory of parent + * @return True, if other is subdirectory of parent + */ + public static boolean isSubdirectory(String parent, String other) { + return other.startsWith(parent.endsWith(Path.SEPARATOR) ? parent : parent + Path.SEPARATOR); + } + + public static Path getTransformedPath(String name, String subDir, String root) { + if (root != null) { + Path newPath = new Path(root); + if (subDir != null) { + newPath = new Path(newPath, subDir); + } + return new Path(newPath, name); + } + return null; + } + public static class RemoteIteratorWithFilter implements RemoteIterator<LocatedFileStatus> { + /** + * This works with {@link RemoteIterator} which (potentially) produces all files recursively + * so looking for hidden folders must look at whole path, not just the the last part of it as + * would be appropriate w/o recursive listing. + */ + public static final PathFilter HIDDEN_FILES_FULL_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + do { + String name = p.getName(); + if (name.startsWith("_") || name.startsWith(".")) { + return false; + } + } while ((p = p.getParent()) != null); + return true; + } + }; + private final RemoteIterator<LocatedFileStatus> iter; + private final PathFilter filter; + private LocatedFileStatus nextFile; + + public RemoteIteratorWithFilter(RemoteIterator<LocatedFileStatus> iter, PathFilter filter) + throws IOException { + this.iter = iter; + this.filter = filter; + findNext(); + } + + @Override + public boolean hasNext() throws IOException { + return nextFile != null; + } + + @Override + public LocatedFileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + LocatedFileStatus result = nextFile; + findNext(); + return result; + } + + void findNext() throws IOException { + while (iter.hasNext()) { + LocatedFileStatus status = iter.next(); + if (filter.accept(status.getPath())) { + nextFile = status; + return; + } + } + + // No more matching files in the iterator + nextFile = null; + } + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java new file mode 100644 index 0000000..2122788 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.LoginException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class HdfsUtils { + private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class); + private static final String DISTCP_OPTIONS_PREFIX = "distcp.options."; + // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this + // is still going to work. Otherwise, file IDs can be turned off. Later, we should use + // as public utility method in HDFS to obtain the inode-based path. + private static final String HDFS_ID_PATH_PREFIX = "/.reserved/.inodes/"; + + /** + * Check the permissions on a file. + * @param fs Filesystem the file is contained in + * @param stat Stat info for the file + * @param action action to be performed + * @throws IOException If thrown by Hadoop + * @throws AccessControlException if the file cannot be accessed + */ + public static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action) + throws IOException, LoginException { + checkFileAccess(fs, stat, action, SecurityUtils.getUGI()); + } + + /** + * Check the permissions on a file + * @param fs Filesystem the file is contained in + * @param stat Stat info for the file + * @param action action to be performed + * @param ugi user group info for the current user. This is passed in so that tests can pass + * in mock ones. + * @throws IOException If thrown by Hadoop + * @throws AccessControlException if the file cannot be accessed + */ + @VisibleForTesting + static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action, + UserGroupInformation ugi) throws IOException { + + String user = ugi.getShortUserName(); + String[] groups = ugi.getGroupNames(); + + if (groups != null) { + String superGroupName = fs.getConf().get("dfs.permissions.supergroup", ""); + if (arrayContains(groups, superGroupName)) { + LOG.debug("User \"" + user + "\" belongs to super-group \"" + superGroupName + "\". " + + "Permission granted for action: " + action + "."); + return; + } + } + + FsPermission dirPerms = stat.getPermission(); + + if (user.equals(stat.getOwner())) { + if (dirPerms.getUserAction().implies(action)) { + return; + } + } else if (arrayContains(groups, stat.getGroup())) { + if (dirPerms.getGroupAction().implies(action)) { + return; + } + } else if (dirPerms.getOtherAction().implies(action)) { + return; + } + throw new AccessControlException("action " + action + " not permitted on path " + + stat.getPath() + " for user " + user); + } + + public static boolean isPathEncrypted(Configuration conf, URI fsUri, Path path) + throws IOException { + Path fullPath; + if (path.isAbsolute()) { + fullPath = path; + } else { + fullPath = path.getFileSystem(conf).makeQualified(path); + } + if(!"hdfs".equalsIgnoreCase(path.toUri().getScheme())) { + return false; + } + try { + HdfsAdmin hdfsAdmin = new HdfsAdmin(fsUri, conf); + return (hdfsAdmin.getEncryptionZoneForPath(fullPath) != null); + } catch (FileNotFoundException fnfe) { + LOG.debug("Failed to get EZ for non-existent path: "+ fullPath, fnfe); + return false; + } + } + + private static boolean arrayContains(String[] array, String value) { + if (array == null) return false; + for (String element : array) { + if (element.equals(value)) return true; + } + return false; + } + + public static boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, + String doAsUser) throws IOException { + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + doAsUser, UserGroupInformation.getLoginUser()); + try { + return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws Exception { + return runDistCp(srcPaths, dst, conf); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + public static boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) + throws IOException { + DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst) + .withSyncFolder(true) + .withCRC(true) + .preserve(FileAttribute.BLOCKSIZE) + .build(); + + // Creates the command-line parameters for distcp + List<String> params = constructDistCpParams(srcPaths, dst, conf); + + try { + conf.setBoolean("mapred.mapper.new-api", true); + DistCp distcp = new DistCp(conf, options); + + // HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue + // added by HADOOP-10459 + if (distcp.run(params.toArray(new String[params.size()])) == 0) { + return true; + } else { + return false; + } + } catch (Exception e) { + throw new IOException("Cannot execute DistCp process: " + e, e); + } finally { + conf.setBoolean("mapred.mapper.new-api", false); + } + } + + private static List<String> constructDistCpParams(List<Path> srcPaths, Path dst, + Configuration conf) { + List<String> params = new ArrayList<>(); + for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){ + String distCpOption = entry.getKey(); + String distCpVal = entry.getValue(); + params.add("-" + distCpOption); + if ((distCpVal != null) && (!distCpVal.isEmpty())){ + params.add(distCpVal); + } + } + if (params.size() == 0){ + // if no entries were added via conf, we initiate our defaults + params.add("-update"); + params.add("-pbx"); + } + for (Path src : srcPaths) { + params.add(src.toString()); + } + params.add(dst.toString()); + return params; + } + + public static Path getFileIdPath( + FileSystem fileSystem, Path path, long fileId) { + return (fileSystem instanceof DistributedFileSystem) + ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path; + } + + public static long getFileId(FileSystem fs, String path) throws IOException { + return ensureDfs(fs).getClient().getFileInfo(path).getFileId(); + } + + private static DistributedFileSystem ensureDfs(FileSystem fs) { + if (!(fs instanceof DistributedFileSystem)) { + throw new UnsupportedOperationException("Only supported for DFS; got " + fs.getClass()); + } + return (DistributedFileSystem)fs; + } + + public static class HadoopFileStatus { + + private final FileStatus fileStatus; + private final AclStatus aclStatus; + + public HadoopFileStatus(Configuration conf, FileSystem fs, Path file) throws IOException { + + FileStatus fileStatus = fs.getFileStatus(file); + AclStatus aclStatus = null; + if (Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true")) { + //Attempt extended Acl operations only if its enabled, but don't fail the operation regardless. + try { + aclStatus = fs.getAclStatus(file); + } catch (Exception e) { + LOG.info("Skipping ACL inheritance: File system for path " + file + " " + + "does not support ACLs but dfs.namenode.acls.enabled is set to true. "); + LOG.debug("The details are: " + e, e); + } + }this.fileStatus = fileStatus; + this.aclStatus = aclStatus; + } + + public FileStatus getFileStatus() { + return fileStatus; + } + + List<AclEntry> getAclEntries() { + return aclStatus == null ? null : Collections.unmodifiableList(aclStatus.getEntries()); + } + + @VisibleForTesting + AclStatus getAclStatus() { + return this.aclStatus; + } + } + + /** + * Copy the permissions, group, and ACLs from a source {@link HadoopFileStatus} to a target {@link Path}. This method + * will only log a warning if permissions cannot be set, no exception will be thrown. + * + * @param conf the {@link Configuration} used when setting permissions and ACLs + * @param sourceStatus the source {@link HadoopFileStatus} to copy permissions and ACLs from + * @param targetGroup the group of the target {@link Path}, if this is set and it is equal to the source group, an + * extra set group operation is avoided + * @param fs the {@link FileSystem} that contains the target {@link Path} + * @param target the {@link Path} to copy permissions, group, and ACLs to + * @param recursion recursively set permissions and ACLs on the target {@link Path} + */ + public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus, + String targetGroup, FileSystem fs, Path target, boolean recursion) { + setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, recursion, recursion ? new FsShell() : null); + } + + @VisibleForTesting + static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus, + String targetGroup, FileSystem fs, Path target, boolean recursion, FsShell fsShell) { + try { + FileStatus fStatus = sourceStatus.getFileStatus(); + String group = fStatus.getGroup(); + boolean aclEnabled = Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true"); + FsPermission sourcePerm = fStatus.getPermission(); + List<AclEntry> aclEntries = null; + if (aclEnabled) { + if (sourceStatus.getAclEntries() != null) { + LOG.trace(sourceStatus.getAclStatus().toString()); + aclEntries = new ArrayList<>(sourceStatus.getAclEntries()); + removeBaseAclEntries(aclEntries); + + //the ACL api's also expect the tradition user/group/other permission in the form of ACL + aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction())); + aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction())); + aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction())); + } + } + + if (recursion) { + //use FsShell to change group, permissions, and extended ACL's recursively + fsShell.setConf(conf); + //If there is no group of a file, no need to call chgrp + if (group != null && !group.isEmpty()) { + run(fsShell, new String[]{"-chgrp", "-R", group, target.toString()}); + } + if (aclEnabled) { + if (null != aclEntries) { + //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless. + try { + //construct the -setfacl command + String aclEntry = Joiner.on(",").join(aclEntries); + run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, target.toString()}); + + } catch (Exception e) { + LOG.info("Skipping ACL inheritance: File system for path " + target + " " + + "does not support ACLs but dfs.namenode.acls.enabled is set to true. "); + LOG.debug("The details are: " + e, e); + } + } + } else { + String permission = Integer.toString(sourcePerm.toShort(), 8); + run(fsShell, new String[]{"-chmod", "-R", permission, target.toString()}); + } + } else { + if (group != null && !group.isEmpty()) { + if (targetGroup == null || + !group.equals(targetGroup)) { + fs.setOwner(target, null, group); + } + } + if (aclEnabled) { + if (null != aclEntries) { + fs.setAcl(target, aclEntries); + } + } else { + fs.setPermission(target, sourcePerm); + } + } + } catch (Exception e) { + LOG.warn( + "Unable to inherit permissions for file " + target + " from file " + sourceStatus.getFileStatus().getPath(), + e.getMessage()); + LOG.debug("Exception while inheriting permissions", e); + } + } + + /** + * Removes basic permission acls (unamed acls) from the list of acl entries + * @param entries acl entries to remove from. + */ + private static void removeBaseAclEntries(List<AclEntry> entries) { + Iterables.removeIf(entries, new Predicate<AclEntry>() { + @Override + public boolean apply(AclEntry input) { + if (input.getName() == null) { + return true; + } + return false; + } + }); + } + + /** + * Create a new AclEntry with scope, type and permission (no name). + * + * @param scope + * AclEntryScope scope of the ACL entry + * @param type + * AclEntryType ACL entry type + * @param permission + * FsAction set of permissions in the ACL entry + * @return AclEntry new AclEntry + */ + private static AclEntry newAclEntry(AclEntryScope scope, AclEntryType type, + FsAction permission) { + return new AclEntry.Builder().setScope(scope).setType(type) + .setPermission(permission).build(); + } + + private static void run(FsShell shell, String[] command) throws Exception { + LOG.debug(ArrayUtils.toString(command)); + int retval = shell.run(command); + LOG.debug("Return value is :" + retval); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java new file mode 100644 index 0000000..c681a87 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy; + +import com.google.common.base.Joiner; + +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.ColumnType; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.security.SaslRpcServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MetaStoreUtils { + /** A fixed date format to be used for hive partition column values. */ + public static final ThreadLocal<DateFormat> PARTITION_DATE_FORMAT = + new ThreadLocal<DateFormat>() { + @Override + protected DateFormat initialValue() { + DateFormat val = new SimpleDateFormat("yyyy-MM-dd"); + val.setLenient(false); // Without this, 2020-20-20 becomes 2021-08-20. + val.setTimeZone(TimeZone.getTimeZone("UTC")); + return val; + } + }; + // Indicates a type was derived from the deserializer rather than Hive's metadata. + public static final String TYPE_FROM_DESERIALIZER = "<derived from deserializer>"; + + private static final Logger LOG = LoggerFactory.getLogger(MetaStoreUtils.class); + + // The following two are public for any external users who wish to use them. + /** + * This character is used to mark a database name as having a catalog name prepended. This + * marker should be placed first in the String to make it easy to determine that this has both + * a catalog and a database name. @ is chosen as it is not used in regular expressions. This + * is only intended for use when making old Thrift calls that do not support catalog names. + */ + public static final char CATALOG_DB_THRIFT_NAME_MARKER = '@'; + + /** + * This String is used to seaprate the catalog name from the database name. This should only + * be used in Strings that are prepended with {@link #CATALOG_DB_THRIFT_NAME_MARKER}. # is + * chosen because it is not used in regular expressions. this is only intended for use when + * making old Thrift calls that do not support catalog names. + */ + public static final String CATALOG_DB_SEPARATOR = "#"; + + /** + * Mark a database as being empty (as distinct from null). + */ + public static final String DB_EMPTY_MARKER = "!"; + + public static final String EXTERNAL_TABLE_PURGE = "external.table.purge"; + + // Right now we only support one special character '/'. + // More special characters can be added accordingly in the future. + // NOTE: + // If the following array is updated, please also be sure to update the + // configuration parameter documentation + // HIVE_SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES in HiveConf as well. + private static final char[] specialCharactersInTableNames = new char[] { '/' }; + + /** + * Catches exceptions that can't be handled and bundles them to MetaException + * + * @param e exception to wrap. + * @throws MetaException wrapper for the exception + */ + public static void logAndThrowMetaException(Exception e) throws MetaException { + String exInfo = "Got exception: " + e.getClass().getName() + " " + + e.getMessage(); + LOG.error(exInfo, e); + LOG.error("Converting exception to MetaException"); + throw new MetaException(exInfo); + } + + public static String encodeTableName(String name) { + // The encoding method is simple, e.g., replace + // all the special characters with the corresponding number in ASCII. + // Note that unicode is not supported in table names. And we have explicit + // checks for it. + StringBuilder sb = new StringBuilder(); + for (char ch : name.toCharArray()) { + if (Character.isLetterOrDigit(ch) || ch == '_') { + sb.append(ch); + } else { + sb.append('-').append((int) ch).append('-'); + } + } + return sb.toString(); + } + + /** + * convert Exception to MetaException, which sets the cause to such exception + * @param e cause of the exception + * @return the MetaException with the specified exception as the cause + */ + public static MetaException newMetaException(Exception e) { + return newMetaException(e != null ? e.getMessage() : null, e); + } + + /** + * convert Exception to MetaException, which sets the cause to such exception + * @param errorMessage the error message for this MetaException + * @param e cause of the exception + * @return the MetaException with the specified exception as the cause + */ + public static MetaException newMetaException(String errorMessage, Exception e) { + MetaException metaException = new MetaException(errorMessage); + if (e != null) { + metaException.initCause(e); + } + return metaException; + } + + + public static List<String> getColumnNamesForTable(Table table) { + List<String> colNames = new ArrayList<>(); + Iterator<FieldSchema> colsIterator = table.getSd().getColsIterator(); + while (colsIterator.hasNext()) { + colNames.add(colsIterator.next().getName()); + } + return colNames; + } + + /** + * validateName + * + * Checks the name conforms to our standars which are: "[a-zA-z_0-9]+". checks + * this is just characters and numbers and _ + * + * @param name + * the name to validate + * @param conf + * hive configuration + * @return true or false depending on conformance + * if it doesn't match the pattern. + */ + public static boolean validateName(String name, Configuration conf) { + Pattern tpat; + String allowedCharacters = "\\w_"; + if (conf != null + && MetastoreConf.getBoolVar(conf, + MetastoreConf.ConfVars.SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES)) { + for (Character c : specialCharactersInTableNames) { + allowedCharacters += c; + } + } + tpat = Pattern.compile("[" + allowedCharacters + "]+"); + Matcher m = tpat.matcher(name); + return m.matches(); + } + + /** + * Determines whether a table is an external table. + * + * @param table table of interest + * + * @return true if external + */ + public static boolean isExternalTable(Table table) { + if (table == null) { + return false; + } + Map<String, String> params = table.getParameters(); + if (params == null) { + return false; + } + + return isExternal(params); + } + + /** + * Determines whether an table needs to be purged or not. + * + * @param table table of interest + * + * @return true if external table needs to be purged + */ + public static boolean isExternalTablePurge(Table table) { + if (table == null) { + return false; + } + Map<String, String> params = table.getParameters(); + if (params == null) { + return false; + } + + return isPropertyTrue(params, EXTERNAL_TABLE_PURGE); + } + + public static boolean isExternal(Map<String, String> tableParams){ + return isPropertyTrue(tableParams, "EXTERNAL"); + } + + public static boolean isPropertyTrue(Map<String, String> tableParams, String prop) { + return "TRUE".equalsIgnoreCase(tableParams.get(prop)); + } + + + /** Duplicates AcidUtils; used in a couple places in metastore. */ + public static boolean isInsertOnlyTableParam(Map<String, String> params) { + String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp)); + } + + public static boolean isNonNativeTable(Table table) { + if (table == null || table.getParameters() == null) { + return false; + } + return (table.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE) != null); + } + + /** + * Given a list of partition columns and a partial mapping from + * some partition columns to values the function returns the values + * for the column. + * @param partCols the list of table partition columns + * @param partSpec the partial mapping from partition column to values + * @return list of values of for given partition columns, any missing + * values in partSpec is replaced by an empty string + */ + public static List<String> getPvals(List<FieldSchema> partCols, + Map<String, String> partSpec) { + List<String> pvals = new ArrayList<>(partCols.size()); + for (FieldSchema field : partCols) { + String val = StringUtils.defaultString(partSpec.get(field.getName())); + pvals.add(val); + } + return pvals; + } + public static String makePartNameMatcher(Table table, List<String> partVals) throws MetaException { + List<FieldSchema> partCols = table.getPartitionKeys(); + int numPartKeys = partCols.size(); + if (partVals.size() > numPartKeys) { + throw new MetaException("Incorrect number of partition values." + + " numPartKeys=" + numPartKeys + ", part_val=" + partVals); + } + partCols = partCols.subList(0, partVals.size()); + // Construct a pattern of the form: partKey=partVal/partKey2=partVal2/... + // where partVal is either the escaped partition value given as input, + // or a regex of the form ".*" + // This works because the "=" and "/" separating key names and partition key/values + // are not escaped. + String partNameMatcher = Warehouse.makePartName(partCols, partVals, ".*"); + // add ".*" to the regex to match anything else afterwards the partial spec. + if (partVals.size() < numPartKeys) { + partNameMatcher += ".*"; + } + return partNameMatcher; + } + + /** + * @param schema1: The first schema to be compared + * @param schema2: The second schema to be compared + * @return true if the two schemas are the same else false + * for comparing a field we ignore the comment it has + */ + public static boolean compareFieldColumns(List<FieldSchema> schema1, List<FieldSchema> schema2) { + if (schema1.size() != schema2.size()) { + return false; + } + Iterator<FieldSchema> its1 = schema1.iterator(); + Iterator<FieldSchema> its2 = schema2.iterator(); + while (its1.hasNext()) { + FieldSchema f1 = its1.next(); + FieldSchema f2 = its2.next(); + // The default equals provided by thrift compares the comments too for + // equality, thus we need to compare the relevant fields here. + if (!StringUtils.equals(f1.getName(), f2.getName()) || + !StringUtils.equals(f1.getType(), f2.getType())) { + return false; + } + } + return true; + } + + public static boolean isArchived(Partition part) { + Map<String, String> params = part.getParameters(); + return "TRUE".equalsIgnoreCase(params.get(hive_metastoreConstants.IS_ARCHIVED)); + } + + public static Path getOriginalLocation(Partition part) { + Map<String, String> params = part.getParameters(); + assert(isArchived(part)); + String originalLocation = params.get(hive_metastoreConstants.ORIGINAL_LOCATION); + assert( originalLocation != null); + + return new Path(originalLocation); + } + + private static String ARCHIVING_LEVEL = "archiving_level"; + public static int getArchivingLevel(Partition part) throws MetaException { + if (!isArchived(part)) { + throw new MetaException("Getting level of unarchived partition"); + } + + String lv = part.getParameters().get(ARCHIVING_LEVEL); + if (lv != null) { + return Integer.parseInt(lv); + } + // partitions archived before introducing multiple archiving + return part.getValues().size(); + } + + /** + * Read and return the meta store Sasl configuration. Currently it uses the default + * Hadoop SASL configuration and can be configured using "hadoop.rpc.protection" + * HADOOP-10211, made a backward incompatible change due to which this call doesn't + * work with Hadoop 2.4.0 and later. + * @param conf + * @return The SASL configuration + */ + public static Map<String, String> getMetaStoreSaslProperties(Configuration conf, boolean useSSL) { + // As of now Hive Meta Store uses the same configuration as Hadoop SASL configuration + + // If SSL is enabled, override the given value of "hadoop.rpc.protection" and set it to "authentication" + // This disables any encryption provided by SASL, since SSL already provides it + String hadoopRpcProtectionVal = conf.get(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION); + String hadoopRpcProtectionAuth = SaslRpcServer.QualityOfProtection.AUTHENTICATION.toString(); + + if (useSSL && hadoopRpcProtectionVal != null && !hadoopRpcProtectionVal.equals(hadoopRpcProtectionAuth)) { + LOG.warn("Overriding value of " + CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION + " setting it from " + + hadoopRpcProtectionVal + " to " + hadoopRpcProtectionAuth + " because SSL is enabled"); + conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, hadoopRpcProtectionAuth); + } + return HadoopThriftAuthBridge.getBridge().getHadoopSaslProperties(conf); + } + + /** + * Add new elements to the classpath. + * + * @param newPaths + * Array of classpath elements + */ + public static ClassLoader addToClassPath(ClassLoader cloader, String[] newPaths) throws Exception { + URLClassLoader loader = (URLClassLoader) cloader; + List<URL> curPath = Arrays.asList(loader.getURLs()); + ArrayList<URL> newPath = new ArrayList<>(curPath.size()); + + // get a list with the current classpath components + for (URL onePath : curPath) { + newPath.add(onePath); + } + curPath = newPath; + + for (String onestr : newPaths) { + URL oneurl = urlFromPathString(onestr); + if (oneurl != null && !curPath.contains(oneurl)) { + curPath.add(oneurl); + } + } + + return new URLClassLoader(curPath.toArray(new URL[0]), loader); + } + + /** + * Create a URL from a string representing a path to a local file. + * The path string can be just a path, or can start with file:/, file:/// + * @param onestr path string + * @return + */ + private static URL urlFromPathString(String onestr) { + URL oneurl = null; + try { + if (onestr.startsWith("file:/")) { + oneurl = new URL(onestr); + } else { + oneurl = new File(onestr).toURL(); + } + } catch (Exception err) { + LOG.error("Bad URL " + onestr + ", ignoring path"); + } + return oneurl; + } + + /** + * Convert FieldSchemas to Thrift DDL. + */ + public static String getDDLFromFieldSchema(String structName, + List<FieldSchema> fieldSchemas) { + StringBuilder ddl = new StringBuilder(); + ddl.append("struct "); + ddl.append(structName); + ddl.append(" { "); + boolean first = true; + for (FieldSchema col : fieldSchemas) { + if (first) { + first = false; + } else { + ddl.append(", "); + } + ddl.append(ColumnType.typeToThriftType(col.getType())); + ddl.append(' '); + ddl.append(col.getName()); + } + ddl.append("}"); + + LOG.trace("DDL: {}", ddl); + return ddl.toString(); + } + + public static Properties getTableMetadata( + org.apache.hadoop.hive.metastore.api.Table table) { + return MetaStoreUtils.getSchema(table.getSd(), table.getSd(), table + .getParameters(), table.getDbName(), table.getTableName(), table.getPartitionKeys()); + } + + public static Properties getPartitionMetadata( + org.apache.hadoop.hive.metastore.api.Partition partition, + org.apache.hadoop.hive.metastore.api.Table table) { + return MetaStoreUtils + .getSchema(partition.getSd(), partition.getSd(), partition + .getParameters(), table.getDbName(), table.getTableName(), + table.getPartitionKeys()); + } + + public static Properties getSchema( + org.apache.hadoop.hive.metastore.api.Partition part, + org.apache.hadoop.hive.metastore.api.Table table) { + return MetaStoreUtils.getSchema(part.getSd(), table.getSd(), table + .getParameters(), table.getDbName(), table.getTableName(), table.getPartitionKeys()); + } + + /** + * Get partition level schema from table level schema. + * This function will use the same column names, column types and partition keys for + * each partition Properties. Their values are copied from the table Properties. This + * is mainly to save CPU and memory. CPU is saved because the first time the + * StorageDescriptor column names are accessed, JDO needs to execute a SQL query to + * retrieve the data. If we know the data will be the same as the table level schema + * and they are immutable, we should just reuse the table level schema objects. + * + * @param sd The Partition level Storage Descriptor. + * @param parameters partition level parameters + * @param tblSchema The table level schema from which this partition should be copied. + * @return the properties + */ + public static Properties getPartSchemaFromTableSchema( + StorageDescriptor sd, + Map<String, String> parameters, + Properties tblSchema) { + + // Inherent most properties from table level schema and overwrite some properties + // in the following code. + // This is mainly for saving CPU and memory to reuse the column names, types and + // partition columns in the table level schema. + Properties schema = (Properties) tblSchema.clone(); + + // InputFormat + String inputFormat = sd.getInputFormat(); + if (inputFormat == null || inputFormat.length() == 0) { + String tblInput = + schema.getProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT); + if (tblInput == null) { + inputFormat = org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName(); + } else { + inputFormat = tblInput; + } + } + schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT, + inputFormat); + + // OutputFormat + String outputFormat = sd.getOutputFormat(); + if (outputFormat == null || outputFormat.length() == 0) { + String tblOutput = + schema.getProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_OUTPUT_FORMAT); + if (tblOutput == null) { + outputFormat = org.apache.hadoop.mapred.SequenceFileOutputFormat.class.getName(); + } else { + outputFormat = tblOutput; + } + } + schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_OUTPUT_FORMAT, + outputFormat); + + // Location + if (sd.getLocation() != null) { + schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_LOCATION, + sd.getLocation()); + } + + // Bucket count + schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT, + Integer.toString(sd.getNumBuckets())); + + if (sd.getBucketCols() != null && sd.getBucketCols().size() > 0) { + schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_FIELD_NAME, + Joiner.on(",").join(sd.getBucketCols())); + } + + // SerdeInfo + if (sd.getSerdeInfo() != null) { + + // We should not update the following 3 values if SerDeInfo contains these. + // This is to keep backward compatible with getSchema(), where these 3 keys + // are updated after SerDeInfo properties got copied. + String cols = org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS; + String colTypes = org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMN_TYPES; + String parts = org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS; + + for (Map.Entry<String,String> param : sd.getSerdeInfo().getParameters().entrySet()) { + String key = param.getKey(); + if (schema.get(key) != null && + (key.equals(cols) || key.equals(colTypes) || key.equals(parts) || + // skip Druid properties which are used in DruidSerde, since they are also updated + // after SerDeInfo properties are copied. + key.startsWith("druid."))) { + continue; + } + schema.put(key, (param.getValue() != null) ? param.getValue() : StringUtils.EMPTY); + } + + if (sd.getSerdeInfo().getSerializationLib() != null) { + schema.setProperty(ColumnType.SERIALIZATION_LIB, sd.getSerdeInfo().getSerializationLib()); + } + } + + // skipping columns since partition level field schemas are the same as table level's + // skipping partition keys since it is the same as table level partition keys + + if (parameters != null) { + for (Map.Entry<String, String> e : parameters.entrySet()) { + schema.setProperty(e.getKey(), e.getValue()); + } + } + + return schema; + } + + private static Properties addCols(Properties schema, List<FieldSchema> cols) { + + StringBuilder colNameBuf = new StringBuilder(); + StringBuilder colTypeBuf = new StringBuilder(); + StringBuilder colComment = new StringBuilder(); + + boolean first = true; + String columnNameDelimiter = getColumnNameDelimiter(cols); + for (FieldSchema col : cols) { + if (!first) { + colNameBuf.append(columnNameDelimiter); + colTypeBuf.append(":"); + colComment.append('\0'); + } + colNameBuf.append(col.getName()); + colTypeBuf.append(col.getType()); + colComment.append((null != col.getComment()) ? col.getComment() : StringUtils.EMPTY); + first = false; + } + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS, + colNameBuf.toString()); + schema.setProperty(ColumnType.COLUMN_NAME_DELIMITER, columnNameDelimiter); + String colTypes = colTypeBuf.toString(); + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMN_TYPES, + colTypes); + schema.setProperty("columns.comments", colComment.toString()); + + return schema; + + } + + public static Properties getSchemaWithoutCols(StorageDescriptor sd, + Map<String, String> parameters, String databaseName, String tableName, + List<FieldSchema> partitionKeys) { + Properties schema = new Properties(); + String inputFormat = sd.getInputFormat(); + if (inputFormat == null || inputFormat.length() == 0) { + inputFormat = org.apache.hadoop.mapred.SequenceFileInputFormat.class + .getName(); + } + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT, + inputFormat); + String outputFormat = sd.getOutputFormat(); + if (outputFormat == null || outputFormat.length() == 0) { + outputFormat = org.apache.hadoop.mapred.SequenceFileOutputFormat.class + .getName(); + } + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_OUTPUT_FORMAT, + outputFormat); + + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_NAME, + databaseName + "." + tableName); + + if (sd.getLocation() != null) { + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_LOCATION, + sd.getLocation()); + } + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT, Integer + .toString(sd.getNumBuckets())); + if (sd.getBucketCols() != null && sd.getBucketCols().size() > 0) { + schema.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_FIELD_NAME, + Joiner.on(",").join(sd.getBucketCols())); + } + if (sd.getSerdeInfo() != null) { + for (Map.Entry<String,String> param : sd.getSerdeInfo().getParameters().entrySet()) { + schema.put(param.getKey(), (param.getValue() != null) ? param.getValue() : StringUtils.EMPTY); + } + + if (sd.getSerdeInfo().getSerializationLib() != null) { + schema.setProperty(ColumnType.SERIALIZATION_LIB, sd .getSerdeInfo().getSerializationLib()); + } + } + + if (sd.getCols() != null) { + schema.setProperty(ColumnType.SERIALIZATION_DDL, getDDLFromFieldSchema(tableName, sd.getCols())); + } + + String partString = StringUtils.EMPTY; + String partStringSep = StringUtils.EMPTY; + String partTypesString = StringUtils.EMPTY; + String partTypesStringSep = StringUtils.EMPTY; + for (FieldSchema partKey : partitionKeys) { + partString = partString.concat(partStringSep); + partString = partString.concat(partKey.getName()); + partTypesString = partTypesString.concat(partTypesStringSep); + partTypesString = partTypesString.concat(partKey.getType()); + if (partStringSep.length() == 0) { + partStringSep = "/"; + partTypesStringSep = ":"; + } + } + if (partString.length() > 0) { + schema + .setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, + partString); + schema + .setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES, + partTypesString); + } + + if (parameters != null) { + for (Map.Entry<String, String> e : parameters.entrySet()) { + // add non-null parameters to the schema + if ( e.getValue() != null) { + schema.setProperty(e.getKey(), e.getValue()); + } + } + } + + return schema; + } + + public static Properties getSchema( + org.apache.hadoop.hive.metastore.api.StorageDescriptor sd, + org.apache.hadoop.hive.metastore.api.StorageDescriptor tblsd, + Map<String, String> parameters, String databaseName, String tableName, + List<FieldSchema> partitionKeys) { + + return addCols(getSchemaWithoutCols(sd, parameters, databaseName, tableName, partitionKeys), tblsd.getCols()); + } + + public static String getColumnNameDelimiter(List<FieldSchema> fieldSchemas) { + // we first take a look if any fieldSchemas contain COMMA + for (int i = 0; i < fieldSchemas.size(); i++) { + if (fieldSchemas.get(i).getName().contains(",")) { + return String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER); + } + } + return String.valueOf(','); + } + + /** + * Convert FieldSchemas to columnNames. + */ + public static String getColumnNamesFromFieldSchema(List<FieldSchema> fieldSchemas) { + String delimiter = getColumnNameDelimiter(fieldSchemas); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < fieldSchemas.size(); i++) { + if (i > 0) { + sb.append(delimiter); + } + sb.append(fieldSchemas.get(i).getName()); + } + return sb.toString(); + } + + /** + * Convert FieldSchemas to columnTypes. + */ + public static String getColumnTypesFromFieldSchema( + List<FieldSchema> fieldSchemas) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < fieldSchemas.size(); i++) { + if (i > 0) { + sb.append(","); + } + sb.append(fieldSchemas.get(i).getType()); + } + return sb.toString(); + } + + public static String getColumnCommentsFromFieldSchema(List<FieldSchema> fieldSchemas) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < fieldSchemas.size(); i++) { + if (i > 0) { + sb.append(ColumnType.COLUMN_COMMENTS_DELIMITER); + } + sb.append(fieldSchemas.get(i).getComment()); + } + return sb.toString(); + } + + public static boolean isMaterializedViewTable(Table table) { + if (table == null) { + return false; + } + return TableType.MATERIALIZED_VIEW.toString().equals(table.getTableType()); + } + + public static List<String> getColumnNames(List<FieldSchema> schema) { + List<String> cols = new ArrayList<>(schema.size()); + for (FieldSchema fs : schema) { + cols.add(fs.getName()); + } + return cols; + } + + public static boolean isValidSchedulingPolicy(String str) { + try { + parseSchedulingPolicy(str); + return true; + } catch (IllegalArgumentException ex) { + } + return false; + } + + public static WMPoolSchedulingPolicy parseSchedulingPolicy(String schedulingPolicy) { + if (schedulingPolicy == null) { + return WMPoolSchedulingPolicy.FAIR; + } + schedulingPolicy = schedulingPolicy.trim().toUpperCase(); + if ("DEFAULT".equals(schedulingPolicy)) { + return WMPoolSchedulingPolicy.FAIR; + } + return Enum.valueOf(WMPoolSchedulingPolicy.class, schedulingPolicy); + } + + private static boolean hasCatalogName(String dbName) { + return dbName != null && dbName.length() > 0 && + dbName.charAt(0) == CATALOG_DB_THRIFT_NAME_MARKER; + } + + /** + * Given a catalog name and database name cram them together into one string. This method can + * be used if you do not know the catalog name, in which case the default catalog will be + * retrieved from the conf object. The resulting string can be parsed apart again via + * {@link #parseDbName(String, Configuration)}. + * @param catalogName catalog name, can be null if no known. + * @param dbName database name, can be null or empty. + * @param conf configuration object, used to determine default catalog if catalogName is null + * @return one string that contains both. + */ + public static String prependCatalogToDbName(@Nullable String catalogName, @Nullable String dbName, + Configuration conf) { + if (catalogName == null) catalogName = getDefaultCatalog(conf); + StringBuilder buf = new StringBuilder() + .append(CATALOG_DB_THRIFT_NAME_MARKER) + .append(catalogName) + .append(CATALOG_DB_SEPARATOR); + if (dbName != null) { + if (dbName.isEmpty()) buf.append(DB_EMPTY_MARKER); + else buf.append(dbName); + } + return buf.toString(); + } + + /** + * Given a catalog name and database name, cram them together into one string. These can be + * parsed apart again via {@link #parseDbName(String, Configuration)}. + * @param catalogName catalog name. This cannot be null. If this might be null use + * {@link #prependCatalogToDbName(String, String, Configuration)} instead. + * @param dbName database name. + * @return one string that contains both. + */ + public static String prependNotNullCatToDbName(String catalogName, String dbName) { + assert catalogName != null; + return prependCatalogToDbName(catalogName, dbName, null); + } + + /** + * Prepend the default 'hive' catalog onto the database name. + * @param dbName database name + * @param conf configuration object, used to determine default catalog + * @return one string with the 'hive' catalog name prepended. + */ + public static String prependCatalogToDbName(String dbName, Configuration conf) { + return prependCatalogToDbName(null, dbName, conf); + } + + private final static String[] nullCatalogAndDatabase = {null, null}; + + /** + * Parse the catalog name out of the database name. If no catalog name is present then the + * default catalog (as set in configuration file) will be assumed. + * @param dbName name of the database. This may or may not contain the catalog name. + * @param conf configuration object, used to determine the default catalog if it is not present + * in the database name. + * @return an array of two elements, the first being the catalog name, the second the database + * name. + * @throws MetaException if the name is not either just a database name or a catalog plus + * database name with the proper delimiters. + */ + public static String[] parseDbName(String dbName, Configuration conf) throws MetaException { + if (dbName == null) return nullCatalogAndDatabase; + if (hasCatalogName(dbName)) { + if (dbName.endsWith(CATALOG_DB_SEPARATOR)) { + // This means the DB name is null + return new String[] {dbName.substring(1, dbName.length() - 1), null}; + } else if (dbName.endsWith(DB_EMPTY_MARKER)) { + // This means the DB name is empty + return new String[] {dbName.substring(1, dbName.length() - DB_EMPTY_MARKER.length() - 1), ""}; + } + String[] names = dbName.substring(1).split(CATALOG_DB_SEPARATOR, 2); + if (names.length != 2) { + throw new MetaException(dbName + " is prepended with the catalog marker but does not " + + "appear to have a catalog name in it"); + } + return names; + } else { + return new String[] {getDefaultCatalog(conf), dbName}; + } + } + + /** + * Position in the array returned by {@link #parseDbName} that has the catalog name. + */ + public static final int CAT_NAME = 0; + /** + * Position in the array returned by {@link #parseDbName} that has the database name. + */ + public static final int DB_NAME = 1; + + public static String getDefaultCatalog(Configuration conf) { + if (conf == null) { + LOG.warn("Configuration is null, so going with default catalog."); + return Warehouse.DEFAULT_CATALOG_NAME; + } + String catName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT); + if (catName == null || "".equals(catName)) catName = Warehouse.DEFAULT_CATALOG_NAME; + return catName; + } + + public static boolean isView(Table table) { + if (table == null) { + return false; + } + return TableType.VIRTUAL_VIEW.toString().equals(table.getTableType()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java new file mode 100644 index 0000000..bae1ec3 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier; +import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.thrift.transport.TSSLTransportFactory; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLSocket; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; +import javax.security.auth.login.LoginException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SecurityUtils { + private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class); + + public static UserGroupInformation getUGI() throws LoginException, IOException { + String doAs = System.getenv("HADOOP_USER_NAME"); + if (doAs != null && doAs.length() > 0) { + /* + * this allows doAs (proxy user) to be passed along across process boundary where + * delegation tokens are not supported. For example, a DDL stmt via WebHCat with + * a doAs parameter, forks to 'hcat' which needs to start a Session that + * proxies the end user + */ + return UserGroupInformation.createProxyUser(doAs, UserGroupInformation.getLoginUser()); + } + return UserGroupInformation.getCurrentUser(); + } + /** + * Dynamically sets up the JAAS configuration that uses kerberos + * @param principal + * @param keyTabFile + * @throws IOException + */ + public static void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException { + // ZooKeeper property name to pick the correct JAAS conf section + final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient"; + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME); + + principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0"); + JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile); + + // Install the Configuration in the runtime. + javax.security.auth.login.Configuration.setConfiguration(jaasConf); + } + + /** + * A JAAS configuration for ZooKeeper clients intended to use for SASL + * Kerberos. + */ + private static class JaasConfiguration extends javax.security.auth.login.Configuration { + // Current installed Configuration + private static final boolean IBM_JAVA = System.getProperty("java.vendor") + .contains("IBM"); + private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration + .getConfiguration(); + private final String loginContextName; + private final String principal; + private final String keyTabFile; + + public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) { + this.loginContextName = hiveLoginContextName; + this.principal = principal; + this.keyTabFile = keyTabFile; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + if (loginContextName.equals(appName)) { + Map<String, String> krbOptions = new HashMap<String, String>(); + if (IBM_JAVA) { + krbOptions.put("credsType", "both"); + krbOptions.put("useKeytab", keyTabFile); + } else { + krbOptions.put("doNotPrompt", "true"); + krbOptions.put("storeKey", "true"); + krbOptions.put("useKeyTab", "true"); + krbOptions.put("keyTab", keyTabFile); + } + krbOptions.put("principal", principal); + krbOptions.put("refreshKrb5Config", "true"); + AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions); + return new AppConfigurationEntry[] { hiveZooKeeperClientEntry }; + } + // Try the base config + if (baseConfig != null) { + return baseConfig.getAppConfigurationEntry(appName); + } + return null; + } + } + + /** + * Get the string form of the token given a token signature. The signature is used as the value of + * the "service" field in the token for lookup. Ref: AbstractDelegationTokenSelector in Hadoop. If + * there exists such a token in the token cache (credential store) of the job, the lookup returns + * that. This is relevant only when running against a "secure" hadoop release The method gets hold + * of the tokens if they are set up by hadoop - this should happen on the map/reduce tasks if the + * client added the tokens into hadoop's credential store in the front end during job submission. + * The method will select the hive delegation token among the set of tokens and return the string + * form of it + * + * @param tokenSignature + * @return the string form of the token found + * @throws IOException + */ + public static String getTokenStrForm(String tokenSignature) throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector(); + + Token<? extends TokenIdentifier> token = tokenSelector.selectToken( + tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens()); + return token != null ? token.encodeToUrlString() : null; + } + + /** + * Create a delegation token object for the given token string and service. Add the token to given + * UGI + * + * @param ugi + * @param tokenStr + * @param tokenService + * @throws IOException + */ + public static void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService) + throws IOException { + Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService); + ugi.addToken(delegationToken); + } + + /** + * Create a new token using the given string and service + * + * @param tokenStr + * @param tokenService + * @return + * @throws IOException + */ + private static Token<DelegationTokenIdentifier> createToken(String tokenStr, String tokenService) + throws IOException { + Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenService)); + return delegationToken; + } + + + /** + * @return the user name set in hadoop.job.ugi param or the current user from System + * @throws IOException if underlying Hadoop call throws LoginException + */ + public static String getUser() throws IOException { + try { + UserGroupInformation ugi = getUGI(); + return ugi.getUserName(); + } catch (LoginException le) { + throw new IOException(le); + } + } + + public static TServerSocket getServerSocket(String hiveHost, int portNum) throws TTransportException { + InetSocketAddress serverAddress; + if (hiveHost == null || hiveHost.isEmpty()) { + // Wildcard bind + serverAddress = new InetSocketAddress(portNum); + } else { + serverAddress = new InetSocketAddress(hiveHost, portNum); + } + return new TServerSocket(serverAddress); + } + + public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, String keyStorePath, + String keyStorePassWord, List<String> sslVersionBlacklist) throws TTransportException, + UnknownHostException { + TSSLTransportFactory.TSSLTransportParameters params = + new TSSLTransportFactory.TSSLTransportParameters(); + params.setKeyStore(keyStorePath, keyStorePassWord); + InetSocketAddress serverAddress; + if (hiveHost == null || hiveHost.isEmpty()) { + // Wildcard bind + serverAddress = new InetSocketAddress(portNum); + } else { + serverAddress = new InetSocketAddress(hiveHost, portNum); + } + TServerSocket thriftServerSocket = + TSSLTransportFactory.getServerSocket(portNum, 0, serverAddress.getAddress(), params); + if (thriftServerSocket.getServerSocket() instanceof SSLServerSocket) { + List<String> sslVersionBlacklistLocal = new ArrayList<>(); + for (String sslVersion : sslVersionBlacklist) { + sslVersionBlacklistLocal.add(sslVersion.trim().toLowerCase()); + } + SSLServerSocket sslServerSocket = (SSLServerSocket) thriftServerSocket.getServerSocket(); + List<String> enabledProtocols = new ArrayList<>(); + for (String protocol : sslServerSocket.getEnabledProtocols()) { + if (sslVersionBlacklistLocal.contains(protocol.toLowerCase())) { + LOG.debug("Disabling SSL Protocol: " + protocol); + } else { + enabledProtocols.add(protocol); + } + } + sslServerSocket.setEnabledProtocols(enabledProtocols.toArray(new String[0])); + LOG.info("SSL Server Socket Enabled Protocols: " + + Arrays.toString(sslServerSocket.getEnabledProtocols())); + } + return thriftServerSocket; + } + + public static TTransport getSSLSocket(String host, int port, int loginTimeout, + String trustStorePath, String trustStorePassWord) throws TTransportException { + TSSLTransportFactory.TSSLTransportParameters params = + new TSSLTransportFactory.TSSLTransportParameters(); + params.setTrustStore(trustStorePath, trustStorePassWord); + params.requireClientAuth(true); + // The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT and + // SSLContext created with the given params + TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout, params); + return getSSLSocketWithHttps(tSSLSocket); + } + + // Using endpoint identification algorithm as HTTPS enables us to do + // CNAMEs/subjectAltName verification + private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException { + SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket(); + SSLParameters sslParams = sslSocket.getSSLParameters(); + sslParams.setEndpointIdentificationAlgorithm("HTTPS"); + sslSocket.setSSLParameters(sslParams); + return new TSocket(sslSocket); + } +}