http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
new file mode 100644
index 0000000..155ecb1
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class FileUtils {
+  private static final PathFilter SNAPSHOT_DIR_PATH_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      return ".snapshot".equalsIgnoreCase(p.getName());
+    }
+  };
+  private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+  public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+  /**
+   * Filter that filters out hidden files
+   */
+  private static final PathFilter hiddenFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /**
+   * Move a particular file or directory to the trash.
+   * @param fs FileSystem to use
+   * @param f path of file or directory to move to trash.
+   * @param conf configuration object
+   * @return true if move successful
+   * @throws IOException
+   */
+  public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, 
boolean purge)
+      throws IOException {
+    LOG.debug("deleting  " + f);
+    boolean result;
+    try {
+      if(purge) {
+        LOG.debug("purge is set to true. Not moving to Trash " + f);
+      } else {
+        result = Trash.moveToAppropriateTrash(fs, f, conf);
+        if (result) {
+          LOG.trace("Moved to trash: " + f);
+          return true;
+        }
+      }
+    } catch (IOException ioe) {
+      // for whatever failure reason including that trash has lower encryption 
zone
+      // retry with force delete
+      LOG.warn(ioe.getMessage() + "; Force to delete it.");
+    }
+
+    result = fs.delete(f, true);
+    if (!result) {
+      LOG.error("Failed to delete " + f);
+    }
+    return result;
+  }
+
+  /**
+   * Copies files between filesystems.
+   */
+  public static boolean copy(FileSystem srcFS, Path src,
+      FileSystem dstFS, Path dst,
+      boolean deleteSource,
+      boolean overwrite,
+      Configuration conf) throws IOException {
+    boolean copied = false;
+    boolean triedDistcp = false;
+
+    /* Run distcp if source file/dir is too big */
+    if (srcFS.getUri().getScheme().equals("hdfs")) {
+      ContentSummary srcContentSummary = srcFS.getContentSummary(src);
+      if (srcContentSummary.getFileCount() >
+            MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES)
+          && srcContentSummary.getLength() >
+            MetastoreConf.getLongVar(conf,ConfVars.REPL_COPYFILE_MAXSIZE)) {
+
+        LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: 
" +
+            MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXSIZE) + 
")");
+        LOG.info("Source is " + srcContentSummary.getFileCount() + " files. 
(MAX: " +
+            MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES) 
+ ")");
+        LOG.info("Launch distributed copy (distcp) job.");
+        triedDistcp = true;
+        copied = distCp(srcFS, Collections.singletonList(src), dst, 
deleteSource, null, conf);
+      }
+    }
+    if (!triedDistcp) {
+      // Note : Currently, this implementation does not "fall back" to regular 
copy if distcp
+      // is tried and it fails. We depend upon that behaviour in cases like 
replication,
+      // wherein if distcp fails, there is good reason to not plod along with 
a trivial
+      // implementation, and fail instead.
+      copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, 
conf);
+    }
+    return copied;
+  }
+
+  private static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path 
dst,
+                                boolean deleteSource, String doAsUser,
+                                Configuration conf) throws IOException {
+    boolean copied;
+    if (doAsUser == null){
+      copied = HdfsUtils.runDistCp(srcPaths, dst, conf);
+    } else {
+      copied = HdfsUtils.runDistCpAs(srcPaths, dst, conf, doAsUser);
+    }
+    if (copied && deleteSource) {
+      for (Path path : srcPaths) {
+        srcFS.delete(path, true);
+      }
+    }
+    return copied;
+  }
+
+  /**
+   * Creates the directory and all necessary parent directories.
+   * @param fs FileSystem to use
+   * @param f path to create.
+   * @return true if directory created successfully.  False otherwise, 
including if it exists.
+   * @throws IOException exception in creating the directory
+   */
+  public static boolean mkdir(FileSystem fs, Path f) throws IOException {
+    LOG.info("Creating directory if it doesn't exist: " + f);
+    return fs.mkdirs(f);
+  }
+
+  /**
+   * Rename a file.  Unlike {@link FileSystem#rename(Path, Path)}, if the 
destPath already exists
+   * and is a directory, this will NOT move the sourcePath into it.  It will 
throw an IOException
+   * instead.
+   * @param srcFs file system src paths are on
+   * @param destFs file system dest paths are on
+   * @param srcPath source file or directory to move
+   * @param destPath destination file name.  This must be a file and not an 
existing directory.
+   * @return result of fs.rename.
+   * @throws IOException if fs.rename throws it, or if destPath already exists.
+   */
+   public static boolean rename(FileSystem srcFs, FileSystem destFs, Path 
srcPath,
+                               Path destPath) throws IOException {
+   LOG.info("Renaming " + srcPath + " to " + destPath);
+
+   // If destPath directory exists, rename call will move the srcPath
+   // into destPath without failing. So check it before renaming.
+   if(destFs.exists(destPath)) {
+     throw new IOException("Cannot rename the source path. The destination "
+         + "path already exists.");
+   }
+
+   if (equalsFileSystem(srcFs, destFs)) {
+       //just rename the directory
+       return srcFs.rename(srcPath, destPath);
+     } else {
+         Configuration conf = new Configuration();
+         return copy(srcFs, srcPath, destFs, destPath,
+         true,    // delete source
+         false, // overwrite destination
+         conf);
+     }
+   }
+
+  // NOTE: This is for generating the internal path name for partitions. Users
+  // should always use the MetaStore API to get the path name for a partition.
+  // Users should not directly take partition values and turn it into a path
+  // name by themselves, because the logic below may change in the future.
+  //
+  // In the future, it's OK to add new chars to the escape list, and old data
+  // won't be corrupt, because the full path name in metastore is stored.
+  // In that case, Hive will continue to read the old data, but when it creates
+  // new partitions, it will use new names.
+  // edit : There are some use cases for which adding new chars does not seem
+  // to be backward compatible - Eg. if partition was created with name having
+  // a special char that you want to start escaping, and then you try dropping
+  // the partition with a hive version that now escapes the special char using
+  // the list below, then the drop partition fails to work.
+
+  private static BitSet charToEscape = new BitSet(128);
+  static {
+    for (char c = 0; c < ' '; c++) {
+      charToEscape.set(c);
+    }
+
+    /*
+     * ASCII 01-1F are HTTP control characters that need to be escaped.
+     * \u000A and \u000D are \n and \r, respectively.
+     */
+    char[] clist = new char[] {'\u0001', '\u0002', '\u0003', '\u0004',
+                               '\u0005', '\u0006', '\u0007', '\u0008', 
'\u0009', '\n', '\u000B',
+                               '\u000C', '\r', '\u000E', '\u000F', '\u0010', 
'\u0011', '\u0012',
+                               '\u0013', '\u0014', '\u0015', '\u0016', 
'\u0017', '\u0018', '\u0019',
+                               '\u001A', '\u001B', '\u001C', '\u001D', 
'\u001E', '\u001F',
+                               '"', '#', '%', '\'', '*', '/', ':', '=', '?', 
'\\', '\u007F', '{',
+                               '[', ']', '^'};
+
+    for (char c : clist) {
+      charToEscape.set(c);
+    }
+  }
+
+  private static boolean needsEscaping(char c) {
+    return c >= 0 && c < charToEscape.size() && charToEscape.get(c);
+  }
+
+  public static String escapePathName(String path) {
+    return escapePathName(path, null);
+  }
+
+  /**
+   * Escapes a path name.
+   * @param path The path to escape.
+   * @param defaultPath
+   *          The default name for the path, if the given path is empty or 
null.
+   * @return An escaped path name.
+   */
+  public static String escapePathName(String path, String defaultPath) {
+
+    // __HIVE_DEFAULT_NULL__ is the system default value for null and empty 
string.
+    // TODO: we should allow user to specify default partition or HDFS file 
location.
+    if (path == null || path.length() == 0) {
+      if (defaultPath == null) {
+        //previously, when path is empty or null and no default path is 
specified,
+        // __HIVE_DEFAULT_PARTITION__ was the return value for escapePathName
+        return "__HIVE_DEFAULT_PARTITION__";
+      } else {
+        return defaultPath;
+      }
+    }
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < path.length(); i++) {
+      char c = path.charAt(i);
+      if (needsEscaping(c)) {
+        sb.append('%');
+        sb.append(String.format("%1$02X", (int) c));
+      } else {
+        sb.append(c);
+      }
+    }
+    return sb.toString();
+  }
+
+  public static String unescapePathName(String path) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < path.length(); i++) {
+      char c = path.charAt(i);
+      if (c == '%' && i + 2 < path.length()) {
+        int code = -1;
+        try {
+          code = Integer.parseInt(path.substring(i + 1, i + 3), 16);
+        } catch (Exception e) {
+          code = -1;
+        }
+        if (code >= 0) {
+          sb.append((char) code);
+          i += 2;
+          continue;
+        }
+      }
+      sb.append(c);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Get all file status from a root path and recursively go deep into certain 
levels.
+   *
+   * @param path
+   *          the root path
+   * @param level
+   *          the depth of directory to explore
+   * @param fs
+   *          the file system
+   * @return array of FileStatus
+   * @throws IOException
+   */
+  public static List<FileStatus> getFileStatusRecurse(Path path, int level, 
FileSystem fs)
+      throws IOException {
+
+    // if level is <0, the return all files/directories under the specified 
path
+    if (level < 0) {
+      List<FileStatus> result = new ArrayList<>();
+      try {
+        FileStatus fileStatus = fs.getFileStatus(path);
+        FileUtils.listStatusRecursively(fs, fileStatus, result);
+      } catch (IOException e) {
+        // globStatus() API returns empty FileStatus[] when the specified path
+        // does not exist. But getFileStatus() throw IOException. To mimic the
+        // similar behavior we will return empty array on exception. For 
external
+        // tables, the path of the table will not exists during table creation
+        return new ArrayList<>(0);
+      }
+      return result;
+    }
+
+    // construct a path pattern (e.g., /*/*) to find all dynamically generated 
paths
+    StringBuilder sb = new StringBuilder(path.toUri().getPath());
+    for (int i = 0; i < level; i++) {
+      sb.append(Path.SEPARATOR).append("*");
+    }
+    Path pathPattern = new Path(path, sb.toString());
+    return Lists.newArrayList(fs.globStatus(pathPattern, 
FileUtils.HIDDEN_FILES_PATH_FILTER));
+  }
+
+  /**
+   * Recursively lists status for all files starting from a particular 
directory (or individual file
+   * as base case).
+   *
+   * @param fs
+   *          file system
+   *
+   * @param fileStatus
+   *          starting point in file system
+   *
+   * @param results
+   *          receives enumeration of all files found
+   */
+  public static void listStatusRecursively(FileSystem fs, FileStatus 
fileStatus,
+                                           List<FileStatus> results) throws 
IOException {
+
+    if (fileStatus.isDir()) {
+      for (FileStatus stat : fs.listStatus(fileStatus.getPath(), 
HIDDEN_FILES_PATH_FILTER)) {
+        listStatusRecursively(fs, stat, results);
+      }
+    } else {
+      results.add(fileStatus);
+    }
+  }
+
+  public static String makePartName(List<String> partCols, List<String> vals) {
+    return makePartName(partCols, vals, null);
+  }
+
+  /**
+   * Makes a valid partition name.
+   * @param partCols The partition keys' names
+   * @param vals The partition values
+   * @param defaultStr
+   *         The default name given to a partition value if the respective 
value is empty or null.
+   * @return An escaped, valid partition name.
+   */
+  public static String makePartName(List<String> partCols, List<String> vals,
+                                    String defaultStr) {
+    StringBuilder name = new StringBuilder();
+    for (int i = 0; i < partCols.size(); i++) {
+      if (i > 0) {
+        name.append(Path.SEPARATOR);
+      }
+      name.append(escapePathName((partCols.get(i)).toLowerCase(), defaultStr));
+      name.append('=');
+      name.append(escapePathName(vals.get(i), defaultStr));
+    }
+    return name.toString();
+  }
+
+  /**
+   * Determine if two objects reference the same file system.
+   * @param fs1 first file system
+   * @param fs2 second file system
+   * @return return true if both file system arguments point to same file 
system
+   */
+  public static boolean equalsFileSystem(FileSystem fs1, FileSystem fs2) {
+    //When file system cache is disabled, you get different FileSystem objects
+    // for same file system, so '==' can't be used in such cases
+    //FileSystem api doesn't have a .equals() function implemented, so using
+    //the uri for comparison. FileSystem already uses uri+Configuration for
+    //equality in its CACHE .
+    //Once equality has been added in HDFS-9159, we should make use of it
+    return fs1.getUri().equals(fs2.getUri());
+  }
+
+  /**
+   * Check if the path contains a subdirectory named '.snapshot'
+   * @param p path to check
+   * @param fs filesystem of the path
+   * @return true if p contains a subdirectory named '.snapshot'
+   * @throws IOException
+   */
+  public static boolean pathHasSnapshotSubDir(Path p, FileSystem fs) throws 
IOException {
+    // Hadoop is missing a public API to check for snapshotable directories. 
Check with the directory name
+    // until a more appropriate API is provided by HDFS-12257.
+    final FileStatus[] statuses = fs.listStatus(p, 
FileUtils.SNAPSHOT_DIR_PATH_FILTER);
+    return statuses != null && statuses.length != 0;
+  }
+
+  public static void makeDir(Path path, Configuration conf) throws 
MetaException {
+    FileSystem fs;
+    try {
+      fs = path.getFileSystem(conf);
+      if (!fs.exists(path)) {
+        fs.mkdirs(path);
+      }
+    } catch (IOException e) {
+      throw new MetaException("Unable to : " + path);
+    }
+  }
+
+  /**
+   * Utility method that determines if a specified directory already has
+   * contents (non-hidden files) or not - useful to determine if an
+   * immutable table already has contents, for example.
+   * @param fs
+   * @param path
+   * @throws IOException
+   */
+  public static boolean isDirEmpty(FileSystem fs, Path path) throws 
IOException {
+
+    if (fs.exists(path)) {
+      FileStatus[] status = fs.globStatus(new Path(path, "*"), 
hiddenFileFilter);
+      if (status.length > 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Variant of Path.makeQualified that qualifies the input path against the 
default file system
+   * indicated by the configuration
+   *
+   * This does not require a FileSystem handle in most cases - only requires 
the Filesystem URI.
+   * This saves the cost of opening the Filesystem - which can involve RPCs - 
as well as cause
+   * errors
+   *
+   * @param path
+   *          path to be fully qualified
+   * @param conf
+   *          Configuration file
+   * @return path qualified relative to default file system
+   */
+  public static Path makeQualified(Path path, Configuration conf) throws 
IOException {
+
+    if (!path.isAbsolute()) {
+      // in this case we need to get the working directory
+      // and this requires a FileSystem handle. So revert to
+      // original method.
+      FileSystem fs = FileSystem.get(conf);
+      return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    }
+
+    URI fsUri = FileSystem.getDefaultUri(conf);
+    URI pathUri = path.toUri();
+
+    String scheme = pathUri.getScheme();
+    String authority = pathUri.getAuthority();
+
+    // validate/fill-in scheme and authority. this follows logic
+    // identical to FileSystem.get(URI, conf) - but doesn't actually
+    // obtain a file system handle
+
+    if (scheme == null) {
+      // no scheme - use default file system uri
+      scheme = fsUri.getScheme();
+      authority = fsUri.getAuthority();
+      if (authority == null) {
+        authority = "";
+      }
+    } else {
+      if (authority == null) {
+        // no authority - use default one if it applies
+        if (scheme.equals(fsUri.getScheme()) && fsUri.getAuthority() != null) {
+          authority = fsUri.getAuthority();
+        } else {
+          authority = "";
+        }
+      }
+    }
+
+    return new Path(scheme, authority, pathUri.getPath());
+  }
+
+  /**
+   * Returns a BEST GUESS as to whether or not other is a subdirectory of 
parent. It does not
+   * take into account any intricacies of the underlying file system, which is 
assumed to be
+   * HDFS. This should not return any false positives, but may return false 
negatives.
+   *
+   * @param parent
+   * @param other Directory to check if it is a subdirectory of parent
+   * @return True, if other is subdirectory of parent
+   */
+  public static boolean isSubdirectory(String parent, String other) {
+    return other.startsWith(parent.endsWith(Path.SEPARATOR) ? parent : parent 
+ Path.SEPARATOR);
+  }
+
+  public static Path getTransformedPath(String name, String subDir, String 
root) {
+    if (root != null) {
+      Path newPath = new Path(root);
+      if (subDir != null) {
+        newPath = new Path(newPath, subDir);
+      }
+      return new Path(newPath, name);
+    }
+    return null;
+  }
+  public static class RemoteIteratorWithFilter implements 
RemoteIterator<LocatedFileStatus> {
+    /**
+     * This works with {@link RemoteIterator} which (potentially) produces all 
files recursively
+     * so looking for hidden folders must look at whole path, not just the the 
last part of it as
+     * would be appropriate w/o recursive listing.
+     */
+    public static final PathFilter HIDDEN_FILES_FULL_PATH_FILTER = new 
PathFilter() {
+      @Override
+      public boolean accept(Path p) {
+        do {
+          String name = p.getName();
+          if (name.startsWith("_") || name.startsWith(".")) {
+            return false;
+          }
+        } while ((p = p.getParent()) != null);
+        return true;
+      }
+    };
+    private final RemoteIterator<LocatedFileStatus> iter;
+    private final PathFilter filter;
+    private LocatedFileStatus nextFile;
+
+    public RemoteIteratorWithFilter(RemoteIterator<LocatedFileStatus> iter, 
PathFilter filter)
+        throws IOException {
+      this.iter = iter;
+      this.filter = filter;
+      findNext();
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      return nextFile != null;
+    }
+
+    @Override
+    public LocatedFileStatus next() throws IOException {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      LocatedFileStatus result = nextFile;
+      findNext();
+      return result;
+    }
+
+    void findNext() throws IOException {
+      while (iter.hasNext()) {
+        LocatedFileStatus status = iter.next();
+        if (filter.accept(status.getPath())) {
+          nextFile = status;
+          return;
+        }
+      }
+
+      // No more matching files in the iterator
+      nextFile = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
new file mode 100644
index 0000000..2122788
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class);
+  private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
+  // TODO: this relies on HDFS not changing the format; we assume if we could 
get inode ID, this
+  //       is still going to work. Otherwise, file IDs can be turned off. 
Later, we should use
+  //       as public utility method in HDFS to obtain the inode-based path.
+  private static final String HDFS_ID_PATH_PREFIX = "/.reserved/.inodes/";
+
+  /**
+   * Check the permissions on a file.
+   * @param fs Filesystem the file is contained in
+   * @param stat Stat info for the file
+   * @param action action to be performed
+   * @throws IOException If thrown by Hadoop
+   * @throws AccessControlException if the file cannot be accessed
+   */
+  public static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction 
action)
+      throws IOException, LoginException {
+    checkFileAccess(fs, stat, action, SecurityUtils.getUGI());
+  }
+
+  /**
+   * Check the permissions on a file
+   * @param fs Filesystem the file is contained in
+   * @param stat Stat info for the file
+   * @param action action to be performed
+   * @param ugi user group info for the current user.  This is passed in so 
that tests can pass
+   *            in mock ones.
+   * @throws IOException If thrown by Hadoop
+   * @throws AccessControlException if the file cannot be accessed
+   */
+  @VisibleForTesting
+  static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action,
+                              UserGroupInformation ugi) throws IOException {
+
+    String user = ugi.getShortUserName();
+    String[] groups = ugi.getGroupNames();
+
+    if (groups != null) {
+      String superGroupName = fs.getConf().get("dfs.permissions.supergroup", 
"");
+      if (arrayContains(groups, superGroupName)) {
+        LOG.debug("User \"" + user + "\" belongs to super-group \"" + 
superGroupName + "\". " +
+            "Permission granted for action: " + action + ".");
+        return;
+      }
+    }
+
+    FsPermission dirPerms = stat.getPermission();
+
+    if (user.equals(stat.getOwner())) {
+      if (dirPerms.getUserAction().implies(action)) {
+        return;
+      }
+    } else if (arrayContains(groups, stat.getGroup())) {
+      if (dirPerms.getGroupAction().implies(action)) {
+        return;
+      }
+    } else if (dirPerms.getOtherAction().implies(action)) {
+      return;
+    }
+    throw new AccessControlException("action " + action + " not permitted on 
path "
+        + stat.getPath() + " for user " + user);
+  }
+
+  public static boolean isPathEncrypted(Configuration conf, URI fsUri, Path 
path)
+      throws IOException {
+    Path fullPath;
+    if (path.isAbsolute()) {
+      fullPath = path;
+    } else {
+      fullPath = path.getFileSystem(conf).makeQualified(path);
+    }
+    if(!"hdfs".equalsIgnoreCase(path.toUri().getScheme())) {
+      return false;
+    }
+    try {
+      HdfsAdmin hdfsAdmin = new HdfsAdmin(fsUri, conf);
+      return (hdfsAdmin.getEncryptionZoneForPath(fullPath) != null);
+    } catch (FileNotFoundException fnfe) {
+      LOG.debug("Failed to get EZ for non-existent path: "+ fullPath, fnfe);
+      return false;
+    }
+  }
+
+  private static boolean arrayContains(String[] array, String value) {
+    if (array == null) return false;
+    for (String element : array) {
+      if (element.equals(value)) return true;
+    }
+    return false;
+  }
+
+  public static boolean runDistCpAs(List<Path> srcPaths, Path dst, 
Configuration conf,
+                                    String doAsUser) throws IOException {
+    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+        doAsUser, UserGroupInformation.getLoginUser());
+    try {
+      return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws Exception {
+          return runDistCp(srcPaths, dst, conf);
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static boolean runDistCp(List<Path> srcPaths, Path dst, Configuration 
conf)
+      throws IOException {
+    DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst)
+        .withSyncFolder(true)
+        .withCRC(true)
+        .preserve(FileAttribute.BLOCKSIZE)
+        .build();
+
+    // Creates the command-line parameters for distcp
+    List<String> params = constructDistCpParams(srcPaths, dst, conf);
+
+    try {
+      conf.setBoolean("mapred.mapper.new-api", true);
+      DistCp distcp = new DistCp(conf, options);
+
+      // HIVE-13704 states that we should use run() instead of execute() due 
to a hadoop known issue
+      // added by HADOOP-10459
+      if (distcp.run(params.toArray(new String[params.size()])) == 0) {
+        return true;
+      } else {
+        return false;
+      }
+    } catch (Exception e) {
+      throw new IOException("Cannot execute DistCp process: " + e, e);
+    } finally {
+      conf.setBoolean("mapred.mapper.new-api", false);
+    }
+  }
+
+  private static List<String> constructDistCpParams(List<Path> srcPaths, Path 
dst,
+                                                    Configuration conf) {
+    List<String> params = new ArrayList<>();
+    for (Map.Entry<String,String> entry : 
conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
+      String distCpOption = entry.getKey();
+      String distCpVal = entry.getValue();
+      params.add("-" + distCpOption);
+      if ((distCpVal != null) && (!distCpVal.isEmpty())){
+        params.add(distCpVal);
+      }
+    }
+    if (params.size() == 0){
+      // if no entries were added via conf, we initiate our defaults
+      params.add("-update");
+      params.add("-pbx");
+    }
+    for (Path src : srcPaths) {
+      params.add(src.toString());
+    }
+    params.add(dst.toString());
+    return params;
+  }
+
+  public static Path getFileIdPath(
+      FileSystem fileSystem, Path path, long fileId) {
+    return (fileSystem instanceof DistributedFileSystem)
+        ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path;
+  }
+
+  public static long getFileId(FileSystem fs, String path) throws IOException {
+    return ensureDfs(fs).getClient().getFileInfo(path).getFileId();
+  }
+
+  private static DistributedFileSystem ensureDfs(FileSystem fs) {
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new UnsupportedOperationException("Only supported for DFS; got " + 
fs.getClass());
+    }
+    return (DistributedFileSystem)fs;
+  }
+
+  public static class HadoopFileStatus {
+
+    private final FileStatus fileStatus;
+    private final AclStatus aclStatus;
+
+    public HadoopFileStatus(Configuration conf, FileSystem fs, Path file) 
throws IOException {
+
+      FileStatus fileStatus = fs.getFileStatus(file);
+      AclStatus aclStatus = null;
+      if (Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true")) {
+        //Attempt extended Acl operations only if its enabled, but don't fail 
the operation regardless.
+        try {
+          aclStatus = fs.getAclStatus(file);
+        } catch (Exception e) {
+          LOG.info("Skipping ACL inheritance: File system for path " + file + 
" " +
+              "does not support ACLs but dfs.namenode.acls.enabled is set to 
true. ");
+          LOG.debug("The details are: " + e, e);
+        }
+      }this.fileStatus = fileStatus;
+      this.aclStatus = aclStatus;
+    }
+
+    public FileStatus getFileStatus() {
+      return fileStatus;
+    }
+
+    List<AclEntry> getAclEntries() {
+      return aclStatus == null ? null : 
Collections.unmodifiableList(aclStatus.getEntries());
+    }
+
+    @VisibleForTesting
+    AclStatus getAclStatus() {
+      return this.aclStatus;
+    }
+  }
+
+  /**
+   * Copy the permissions, group, and ACLs from a source {@link 
HadoopFileStatus} to a target {@link Path}. This method
+   * will only log a warning if permissions cannot be set, no exception will 
be thrown.
+   *
+   * @param conf the {@link Configuration} used when setting permissions and 
ACLs
+   * @param sourceStatus the source {@link HadoopFileStatus} to copy 
permissions and ACLs from
+   * @param targetGroup the group of the target {@link Path}, if this is set 
and it is equal to the source group, an
+   *                    extra set group operation is avoided
+   * @param fs the {@link FileSystem} that contains the target {@link Path}
+   * @param target the {@link Path} to copy permissions, group, and ACLs to
+   * @param recursion recursively set permissions and ACLs on the target 
{@link Path}
+   */
+  public static void setFullFileStatus(Configuration conf, 
HdfsUtils.HadoopFileStatus sourceStatus,
+                                       String targetGroup, FileSystem fs, Path 
target, boolean recursion) {
+    setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, recursion, 
recursion ? new FsShell() : null);
+  }
+
+  @VisibleForTesting
+  static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus 
sourceStatus,
+                                String targetGroup, FileSystem fs, Path 
target, boolean recursion, FsShell fsShell) {
+    try {
+      FileStatus fStatus = sourceStatus.getFileStatus();
+      String group = fStatus.getGroup();
+      boolean aclEnabled = 
Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true");
+      FsPermission sourcePerm = fStatus.getPermission();
+      List<AclEntry> aclEntries = null;
+      if (aclEnabled) {
+        if (sourceStatus.getAclEntries() != null) {
+          LOG.trace(sourceStatus.getAclStatus().toString());
+          aclEntries = new ArrayList<>(sourceStatus.getAclEntries());
+          removeBaseAclEntries(aclEntries);
+
+          //the ACL api's also expect the tradition user/group/other 
permission in the form of ACL
+          aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, 
sourcePerm.getUserAction()));
+          aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, 
sourcePerm.getGroupAction()));
+          aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, 
sourcePerm.getOtherAction()));
+        }
+      }
+
+      if (recursion) {
+        //use FsShell to change group, permissions, and extended ACL's 
recursively
+        fsShell.setConf(conf);
+        //If there is no group of a file, no need to call chgrp
+        if (group != null && !group.isEmpty()) {
+          run(fsShell, new String[]{"-chgrp", "-R", group, target.toString()});
+        }
+        if (aclEnabled) {
+          if (null != aclEntries) {
+            //Attempt extended Acl operations only if its enabled, 8791but 
don't fail the operation regardless.
+            try {
+              //construct the -setfacl command
+              String aclEntry = Joiner.on(",").join(aclEntries);
+              run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, 
target.toString()});
+
+            } catch (Exception e) {
+              LOG.info("Skipping ACL inheritance: File system for path " + 
target + " " +
+                  "does not support ACLs but dfs.namenode.acls.enabled is set 
to true. ");
+              LOG.debug("The details are: " + e, e);
+            }
+          }
+        } else {
+          String permission = Integer.toString(sourcePerm.toShort(), 8);
+          run(fsShell, new String[]{"-chmod", "-R", permission, 
target.toString()});
+        }
+      } else {
+        if (group != null && !group.isEmpty()) {
+          if (targetGroup == null ||
+              !group.equals(targetGroup)) {
+            fs.setOwner(target, null, group);
+          }
+        }
+        if (aclEnabled) {
+          if (null != aclEntries) {
+            fs.setAcl(target, aclEntries);
+          }
+        } else {
+          fs.setPermission(target, sourcePerm);
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "Unable to inherit permissions for file " + target + " from file " + 
sourceStatus.getFileStatus().getPath(),
+          e.getMessage());
+      LOG.debug("Exception while inheriting permissions", e);
+    }
+  }
+
+  /**
+   * Removes basic permission acls (unamed acls) from the list of acl entries
+   * @param entries acl entries to remove from.
+   */
+  private static void removeBaseAclEntries(List<AclEntry> entries) {
+    Iterables.removeIf(entries, new Predicate<AclEntry>() {
+      @Override
+      public boolean apply(AclEntry input) {
+        if (input.getName() == null) {
+          return true;
+        }
+        return false;
+      }
+    });
+  }
+
+  /**
+   * Create a new AclEntry with scope, type and permission (no name).
+   *
+   * @param scope
+   *          AclEntryScope scope of the ACL entry
+   * @param type
+   *          AclEntryType ACL entry type
+   * @param permission
+   *          FsAction set of permissions in the ACL entry
+   * @return AclEntry new AclEntry
+   */
+  private static AclEntry newAclEntry(AclEntryScope scope, AclEntryType type,
+                                      FsAction permission) {
+    return new AclEntry.Builder().setScope(scope).setType(type)
+        .setPermission(permission).build();
+  }
+
+  private static void run(FsShell shell, String[] command) throws Exception {
+    LOG.debug(ArrayUtils.toString(command));
+    int retval = shell.run(command);
+    LOG.debug("Return value is :" + retval);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
new file mode 100644
index 0000000..c681a87
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy;
+
+import com.google.common.base.Joiner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MetaStoreUtils {
+  /** A fixed date format to be used for hive partition column values. */
+  public static final ThreadLocal<DateFormat> PARTITION_DATE_FORMAT =
+       new ThreadLocal<DateFormat>() {
+    @Override
+    protected DateFormat initialValue() {
+      DateFormat val = new SimpleDateFormat("yyyy-MM-dd");
+      val.setLenient(false); // Without this, 2020-20-20 becomes 2021-08-20.
+      val.setTimeZone(TimeZone.getTimeZone("UTC"));
+      return val;
+    }
+  };
+  // Indicates a type was derived from the deserializer rather than Hive's 
metadata.
+  public static final String TYPE_FROM_DESERIALIZER = "<derived from 
deserializer>";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetaStoreUtils.class);
+
+  // The following two are public for any external users who wish to use them.
+  /**
+   * This character is used to mark a database name as having a catalog name 
prepended.  This
+   * marker should be placed first in the String to make it easy to determine 
that this has both
+   * a catalog and a database name.  @ is chosen as it is not used in regular 
expressions.  This
+   * is only intended for use when making old Thrift calls that do not support 
catalog names.
+   */
+  public static final char CATALOG_DB_THRIFT_NAME_MARKER = '@';
+
+  /**
+   * This String is used to seaprate the catalog name from the database name.  
This should only
+   * be used in Strings that are prepended with {@link 
#CATALOG_DB_THRIFT_NAME_MARKER}.  # is
+   * chosen because it is not used in regular expressions.  this is only 
intended for use when
+   * making old Thrift calls that do not support catalog names.
+   */
+  public static final String CATALOG_DB_SEPARATOR = "#";
+
+  /**
+   * Mark a database as being empty (as distinct from null).
+   */
+  public static final String DB_EMPTY_MARKER = "!";
+
+  public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
+
+  // Right now we only support one special character '/'.
+  // More special characters can be added accordingly in the future.
+  // NOTE:
+  // If the following array is updated, please also be sure to update the
+  // configuration parameter documentation
+  // HIVE_SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES in HiveConf as well.
+  private static final char[] specialCharactersInTableNames = new char[] { '/' 
};
+
+  /**
+   * Catches exceptions that can't be handled and bundles them to MetaException
+   *
+   * @param e exception to wrap.
+   * @throws MetaException wrapper for the exception
+   */
+  public static void logAndThrowMetaException(Exception e) throws 
MetaException {
+    String exInfo = "Got exception: " + e.getClass().getName() + " "
+        + e.getMessage();
+    LOG.error(exInfo, e);
+    LOG.error("Converting exception to MetaException");
+    throw new MetaException(exInfo);
+  }
+
+  public static String encodeTableName(String name) {
+    // The encoding method is simple, e.g., replace
+    // all the special characters with the corresponding number in ASCII.
+    // Note that unicode is not supported in table names. And we have explicit
+    // checks for it.
+    StringBuilder sb = new StringBuilder();
+    for (char ch : name.toCharArray()) {
+      if (Character.isLetterOrDigit(ch) || ch == '_') {
+        sb.append(ch);
+      } else {
+        sb.append('-').append((int) ch).append('-');
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * convert Exception to MetaException, which sets the cause to such exception
+   * @param e cause of the exception
+   * @return  the MetaException with the specified exception as the cause
+   */
+  public static MetaException newMetaException(Exception e) {
+    return newMetaException(e != null ? e.getMessage() : null, e);
+  }
+
+  /**
+   * convert Exception to MetaException, which sets the cause to such exception
+   * @param errorMessage  the error message for this MetaException
+   * @param e             cause of the exception
+   * @return  the MetaException with the specified exception as the cause
+   */
+  public static MetaException newMetaException(String errorMessage, Exception 
e) {
+    MetaException metaException = new MetaException(errorMessage);
+    if (e != null) {
+      metaException.initCause(e);
+    }
+    return metaException;
+  }
+
+
+  public static List<String> getColumnNamesForTable(Table table) {
+    List<String> colNames = new ArrayList<>();
+    Iterator<FieldSchema> colsIterator = table.getSd().getColsIterator();
+    while (colsIterator.hasNext()) {
+      colNames.add(colsIterator.next().getName());
+    }
+    return colNames;
+  }
+
+  /**
+   * validateName
+   *
+   * Checks the name conforms to our standars which are: "[a-zA-z_0-9]+". 
checks
+   * this is just characters and numbers and _
+   *
+   * @param name
+   *          the name to validate
+   * @param conf
+   *          hive configuration
+   * @return true or false depending on conformance
+   *              if it doesn't match the pattern.
+   */
+  public static boolean validateName(String name, Configuration conf) {
+    Pattern tpat;
+    String allowedCharacters = "\\w_";
+    if (conf != null
+        && MetastoreConf.getBoolVar(conf,
+        MetastoreConf.ConfVars.SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES)) {
+      for (Character c : specialCharactersInTableNames) {
+        allowedCharacters += c;
+      }
+    }
+    tpat = Pattern.compile("[" + allowedCharacters + "]+");
+    Matcher m = tpat.matcher(name);
+    return m.matches();
+  }
+
+  /**
+   * Determines whether a table is an external table.
+   *
+   * @param table table of interest
+   *
+   * @return true if external
+   */
+  public static boolean isExternalTable(Table table) {
+    if (table == null) {
+      return false;
+    }
+    Map<String, String> params = table.getParameters();
+    if (params == null) {
+      return false;
+    }
+
+    return isExternal(params);
+  }
+
+  /**
+   * Determines whether an table needs to be purged or not.
+   *
+   * @param table table of interest
+   *
+   * @return true if external table needs to be purged
+   */
+  public static boolean isExternalTablePurge(Table table) {
+    if (table == null) {
+      return false;
+    }
+    Map<String, String> params = table.getParameters();
+    if (params == null) {
+      return false;
+    }
+
+    return isPropertyTrue(params, EXTERNAL_TABLE_PURGE);
+  }
+
+  public static boolean isExternal(Map<String, String> tableParams){
+    return isPropertyTrue(tableParams, "EXTERNAL");
+  }
+
+  public static boolean isPropertyTrue(Map<String, String> tableParams, String 
prop) {
+    return "TRUE".equalsIgnoreCase(tableParams.get(prop));
+  }
+
+
+  /** Duplicates AcidUtils; used in a couple places in metastore. */
+  public static boolean isInsertOnlyTableParam(Map<String, String> params) {
+    String transactionalProp = 
params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    return (transactionalProp != null && 
"insert_only".equalsIgnoreCase(transactionalProp));
+  }
+
+  public static boolean isNonNativeTable(Table table) {
+    if (table == null || table.getParameters() == null) {
+      return false;
+    }
+    return 
(table.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE) != null);
+  }
+
+  /**
+   * Given a list of partition columns and a partial mapping from
+   * some partition columns to values the function returns the values
+   * for the column.
+   * @param partCols the list of table partition columns
+   * @param partSpec the partial mapping from partition column to values
+   * @return list of values of for given partition columns, any missing
+   *         values in partSpec is replaced by an empty string
+   */
+  public static List<String> getPvals(List<FieldSchema> partCols,
+                                      Map<String, String> partSpec) {
+    List<String> pvals = new ArrayList<>(partCols.size());
+    for (FieldSchema field : partCols) {
+      String val = StringUtils.defaultString(partSpec.get(field.getName()));
+      pvals.add(val);
+    }
+    return pvals;
+  }
+  public static String makePartNameMatcher(Table table, List<String> partVals) 
throws MetaException {
+    List<FieldSchema> partCols = table.getPartitionKeys();
+    int numPartKeys = partCols.size();
+    if (partVals.size() > numPartKeys) {
+      throw new MetaException("Incorrect number of partition values."
+          + " numPartKeys=" + numPartKeys + ", part_val=" + partVals);
+    }
+    partCols = partCols.subList(0, partVals.size());
+    // Construct a pattern of the form: partKey=partVal/partKey2=partVal2/...
+    // where partVal is either the escaped partition value given as input,
+    // or a regex of the form ".*"
+    // This works because the "=" and "/" separating key names and partition 
key/values
+    // are not escaped.
+    String partNameMatcher = Warehouse.makePartName(partCols, partVals, ".*");
+    // add ".*" to the regex to match anything else afterwards the partial 
spec.
+    if (partVals.size() < numPartKeys) {
+      partNameMatcher += ".*";
+    }
+    return partNameMatcher;
+  }
+
+  /**
+   * @param schema1: The first schema to be compared
+   * @param schema2: The second schema to be compared
+   * @return true if the two schemas are the same else false
+   *         for comparing a field we ignore the comment it has
+   */
+  public static boolean compareFieldColumns(List<FieldSchema> schema1, 
List<FieldSchema> schema2) {
+    if (schema1.size() != schema2.size()) {
+      return false;
+    }
+    Iterator<FieldSchema> its1 = schema1.iterator();
+    Iterator<FieldSchema> its2 = schema2.iterator();
+    while (its1.hasNext()) {
+      FieldSchema f1 = its1.next();
+      FieldSchema f2 = its2.next();
+      // The default equals provided by thrift compares the comments too for
+      // equality, thus we need to compare the relevant fields here.
+      if (!StringUtils.equals(f1.getName(), f2.getName()) ||
+          !StringUtils.equals(f1.getType(), f2.getType())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean isArchived(Partition part) {
+    Map<String, String> params = part.getParameters();
+    return 
"TRUE".equalsIgnoreCase(params.get(hive_metastoreConstants.IS_ARCHIVED));
+  }
+
+  public static Path getOriginalLocation(Partition part) {
+    Map<String, String> params = part.getParameters();
+    assert(isArchived(part));
+    String originalLocation = 
params.get(hive_metastoreConstants.ORIGINAL_LOCATION);
+    assert( originalLocation != null);
+
+    return new Path(originalLocation);
+  }
+
+  private static String ARCHIVING_LEVEL = "archiving_level";
+  public static int getArchivingLevel(Partition part) throws MetaException {
+    if (!isArchived(part)) {
+      throw new MetaException("Getting level of unarchived partition");
+    }
+
+    String lv = part.getParameters().get(ARCHIVING_LEVEL);
+    if (lv != null) {
+      return Integer.parseInt(lv);
+    }
+    // partitions archived before introducing multiple archiving
+    return part.getValues().size();
+  }
+
+  /**
+   * Read and return the meta store Sasl configuration. Currently it uses the 
default
+   * Hadoop SASL configuration and can be configured using 
"hadoop.rpc.protection"
+   * HADOOP-10211, made a backward incompatible change due to which this call 
doesn't
+   * work with Hadoop 2.4.0 and later.
+   * @param conf
+   * @return The SASL configuration
+   */
+  public static Map<String, String> getMetaStoreSaslProperties(Configuration 
conf, boolean useSSL) {
+    // As of now Hive Meta Store uses the same configuration as Hadoop SASL 
configuration
+
+    // If SSL is enabled, override the given value of "hadoop.rpc.protection" 
and set it to "authentication"
+    // This disables any encryption provided by SASL, since SSL already 
provides it
+    String hadoopRpcProtectionVal = 
conf.get(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION);
+    String hadoopRpcProtectionAuth = 
SaslRpcServer.QualityOfProtection.AUTHENTICATION.toString();
+
+    if (useSSL && hadoopRpcProtectionVal != null && 
!hadoopRpcProtectionVal.equals(hadoopRpcProtectionAuth)) {
+      LOG.warn("Overriding value of " + 
CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION + " setting it from "
+          + hadoopRpcProtectionVal + " to " + hadoopRpcProtectionAuth + " 
because SSL is enabled");
+      conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, 
hadoopRpcProtectionAuth);
+    }
+    return HadoopThriftAuthBridge.getBridge().getHadoopSaslProperties(conf);
+  }
+
+  /**
+   * Add new elements to the classpath.
+   *
+   * @param newPaths
+   *          Array of classpath elements
+   */
+  public static ClassLoader addToClassPath(ClassLoader cloader, String[] 
newPaths) throws Exception {
+    URLClassLoader loader = (URLClassLoader) cloader;
+    List<URL> curPath = Arrays.asList(loader.getURLs());
+    ArrayList<URL> newPath = new ArrayList<>(curPath.size());
+
+    // get a list with the current classpath components
+    for (URL onePath : curPath) {
+      newPath.add(onePath);
+    }
+    curPath = newPath;
+
+    for (String onestr : newPaths) {
+      URL oneurl = urlFromPathString(onestr);
+      if (oneurl != null && !curPath.contains(oneurl)) {
+        curPath.add(oneurl);
+      }
+    }
+
+    return new URLClassLoader(curPath.toArray(new URL[0]), loader);
+  }
+
+  /**
+   * Create a URL from a string representing a path to a local file.
+   * The path string can be just a path, or can start with file:/, file:///
+   * @param onestr  path string
+   * @return
+   */
+  private static URL urlFromPathString(String onestr) {
+    URL oneurl = null;
+    try {
+      if (onestr.startsWith("file:/")) {
+        oneurl = new URL(onestr);
+      } else {
+        oneurl = new File(onestr).toURL();
+      }
+    } catch (Exception err) {
+      LOG.error("Bad URL " + onestr + ", ignoring path");
+    }
+    return oneurl;
+  }
+
+  /**
+   * Convert FieldSchemas to Thrift DDL.
+   */
+  public static String getDDLFromFieldSchema(String structName,
+                                             List<FieldSchema> fieldSchemas) {
+    StringBuilder ddl = new StringBuilder();
+    ddl.append("struct ");
+    ddl.append(structName);
+    ddl.append(" { ");
+    boolean first = true;
+    for (FieldSchema col : fieldSchemas) {
+      if (first) {
+        first = false;
+      } else {
+        ddl.append(", ");
+      }
+      ddl.append(ColumnType.typeToThriftType(col.getType()));
+      ddl.append(' ');
+      ddl.append(col.getName());
+    }
+    ddl.append("}");
+
+    LOG.trace("DDL: {}", ddl);
+    return ddl.toString();
+  }
+
+  public static Properties getTableMetadata(
+      org.apache.hadoop.hive.metastore.api.Table table) {
+    return MetaStoreUtils.getSchema(table.getSd(), table.getSd(), table
+        .getParameters(), table.getDbName(), table.getTableName(), 
table.getPartitionKeys());
+  }
+
+  public static Properties getPartitionMetadata(
+      org.apache.hadoop.hive.metastore.api.Partition partition,
+      org.apache.hadoop.hive.metastore.api.Table table) {
+    return MetaStoreUtils
+        .getSchema(partition.getSd(), partition.getSd(), partition
+                .getParameters(), table.getDbName(), table.getTableName(),
+            table.getPartitionKeys());
+  }
+
+  public static Properties getSchema(
+      org.apache.hadoop.hive.metastore.api.Partition part,
+      org.apache.hadoop.hive.metastore.api.Table table) {
+    return MetaStoreUtils.getSchema(part.getSd(), table.getSd(), table
+        .getParameters(), table.getDbName(), table.getTableName(), 
table.getPartitionKeys());
+  }
+
+  /**
+   * Get partition level schema from table level schema.
+   * This function will use the same column names, column types and partition 
keys for
+   * each partition Properties. Their values are copied from the table 
Properties. This
+   * is mainly to save CPU and memory. CPU is saved because the first time the
+   * StorageDescriptor column names are accessed, JDO needs to execute a SQL 
query to
+   * retrieve the data. If we know the data will be the same as the table 
level schema
+   * and they are immutable, we should just reuse the table level schema 
objects.
+   *
+   * @param sd The Partition level Storage Descriptor.
+   * @param parameters partition level parameters
+   * @param tblSchema The table level schema from which this partition should 
be copied.
+   * @return the properties
+   */
+  public static Properties getPartSchemaFromTableSchema(
+      StorageDescriptor sd,
+      Map<String, String> parameters,
+      Properties tblSchema) {
+
+    // Inherent most properties from table level schema and overwrite some 
properties
+    // in the following code.
+    // This is mainly for saving CPU and memory to reuse the column names, 
types and
+    // partition columns in the table level schema.
+    Properties schema = (Properties) tblSchema.clone();
+
+    // InputFormat
+    String inputFormat = sd.getInputFormat();
+    if (inputFormat == null || inputFormat.length() == 0) {
+      String tblInput =
+          
schema.getProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT);
+      if (tblInput == null) {
+        inputFormat = 
org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName();
+      } else {
+        inputFormat = tblInput;
+      }
+    }
+    
schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT,
+        inputFormat);
+
+    // OutputFormat
+    String outputFormat = sd.getOutputFormat();
+    if (outputFormat == null || outputFormat.length() == 0) {
+      String tblOutput =
+          
schema.getProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_OUTPUT_FORMAT);
+      if (tblOutput == null) {
+        outputFormat = 
org.apache.hadoop.mapred.SequenceFileOutputFormat.class.getName();
+      } else {
+        outputFormat = tblOutput;
+      }
+    }
+    
schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_OUTPUT_FORMAT,
+        outputFormat);
+
+    // Location
+    if (sd.getLocation() != null) {
+      
schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_LOCATION,
+          sd.getLocation());
+    }
+
+    // Bucket count
+    
schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT,
+        Integer.toString(sd.getNumBuckets()));
+
+    if (sd.getBucketCols() != null && sd.getBucketCols().size() > 0) {
+      
schema.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_FIELD_NAME,
+        Joiner.on(",").join(sd.getBucketCols()));
+    }
+
+    // SerdeInfo
+    if (sd.getSerdeInfo() != null) {
+
+      // We should not update the following 3 values if SerDeInfo contains 
these.
+      // This is to keep backward compatible with getSchema(), where these 3 
keys
+      // are updated after SerDeInfo properties got copied.
+      String cols = 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS;
+      String colTypes = 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMN_TYPES;
+      String parts = 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS;
+
+      for (Map.Entry<String,String> param : 
sd.getSerdeInfo().getParameters().entrySet()) {
+        String key = param.getKey();
+        if (schema.get(key) != null &&
+                (key.equals(cols) || key.equals(colTypes) || key.equals(parts) 
||
+                        // skip Druid properties which are used in DruidSerde, 
since they are also updated
+                        // after SerDeInfo properties are copied.
+                        key.startsWith("druid."))) {
+          continue;
+        }
+        schema.put(key, (param.getValue() != null) ? param.getValue() : 
StringUtils.EMPTY);
+      }
+
+      if (sd.getSerdeInfo().getSerializationLib() != null) {
+        schema.setProperty(ColumnType.SERIALIZATION_LIB, 
sd.getSerdeInfo().getSerializationLib());
+      }
+    }
+
+    // skipping columns since partition level field schemas are the same as 
table level's
+    // skipping partition keys since it is the same as table level partition 
keys
+
+    if (parameters != null) {
+      for (Map.Entry<String, String> e : parameters.entrySet()) {
+        schema.setProperty(e.getKey(), e.getValue());
+      }
+    }
+
+    return schema;
+  }
+
+  private static Properties addCols(Properties schema, List<FieldSchema> cols) 
{
+
+    StringBuilder colNameBuf = new StringBuilder();
+    StringBuilder colTypeBuf = new StringBuilder();
+    StringBuilder colComment = new StringBuilder();
+
+    boolean first = true;
+    String columnNameDelimiter = getColumnNameDelimiter(cols);
+    for (FieldSchema col : cols) {
+      if (!first) {
+        colNameBuf.append(columnNameDelimiter);
+        colTypeBuf.append(":");
+        colComment.append('\0');
+      }
+      colNameBuf.append(col.getName());
+      colTypeBuf.append(col.getType());
+      colComment.append((null != col.getComment()) ? col.getComment() : 
StringUtils.EMPTY);
+      first = false;
+    }
+    schema.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS,
+        colNameBuf.toString());
+    schema.setProperty(ColumnType.COLUMN_NAME_DELIMITER, columnNameDelimiter);
+    String colTypes = colTypeBuf.toString();
+    schema.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMN_TYPES,
+        colTypes);
+    schema.setProperty("columns.comments", colComment.toString());
+
+    return schema;
+
+  }
+
+  public static Properties getSchemaWithoutCols(StorageDescriptor sd,
+                                                Map<String, String> 
parameters, String databaseName, String tableName,
+                                                List<FieldSchema> 
partitionKeys) {
+    Properties schema = new Properties();
+    String inputFormat = sd.getInputFormat();
+    if (inputFormat == null || inputFormat.length() == 0) {
+      inputFormat = org.apache.hadoop.mapred.SequenceFileInputFormat.class
+          .getName();
+    }
+    schema.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT,
+        inputFormat);
+    String outputFormat = sd.getOutputFormat();
+    if (outputFormat == null || outputFormat.length() == 0) {
+      outputFormat = org.apache.hadoop.mapred.SequenceFileOutputFormat.class
+          .getName();
+    }
+    schema.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_OUTPUT_FORMAT,
+        outputFormat);
+
+    schema.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_NAME,
+        databaseName + "." + tableName);
+
+    if (sd.getLocation() != null) {
+      schema.setProperty(
+          
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_LOCATION,
+          sd.getLocation());
+    }
+    schema.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT, 
Integer
+            .toString(sd.getNumBuckets()));
+    if (sd.getBucketCols() != null && sd.getBucketCols().size() > 0) {
+      schema.setProperty(
+          
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_FIELD_NAME,
+        Joiner.on(",").join(sd.getBucketCols()));
+    }
+    if (sd.getSerdeInfo() != null) {
+      for (Map.Entry<String,String> param : 
sd.getSerdeInfo().getParameters().entrySet()) {
+        schema.put(param.getKey(), (param.getValue() != null) ? 
param.getValue() : StringUtils.EMPTY);
+      }
+
+      if (sd.getSerdeInfo().getSerializationLib() != null) {
+        schema.setProperty(ColumnType.SERIALIZATION_LIB, sd 
.getSerdeInfo().getSerializationLib());
+      }
+    }
+
+    if (sd.getCols() != null) {
+      schema.setProperty(ColumnType.SERIALIZATION_DDL, 
getDDLFromFieldSchema(tableName, sd.getCols()));
+    }
+
+    String partString = StringUtils.EMPTY;
+    String partStringSep = StringUtils.EMPTY;
+    String partTypesString = StringUtils.EMPTY;
+    String partTypesStringSep = StringUtils.EMPTY;
+    for (FieldSchema partKey : partitionKeys) {
+      partString = partString.concat(partStringSep);
+      partString = partString.concat(partKey.getName());
+      partTypesString = partTypesString.concat(partTypesStringSep);
+      partTypesString = partTypesString.concat(partKey.getType());
+      if (partStringSep.length() == 0) {
+        partStringSep = "/";
+        partTypesStringSep = ":";
+      }
+    }
+    if (partString.length() > 0) {
+      schema
+          .setProperty(
+              
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+              partString);
+      schema
+          .setProperty(
+              
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES,
+              partTypesString);
+    }
+
+    if (parameters != null) {
+      for (Map.Entry<String, String> e : parameters.entrySet()) {
+        // add non-null parameters to the schema
+        if ( e.getValue() != null) {
+          schema.setProperty(e.getKey(), e.getValue());
+        }
+      }
+    }
+
+    return schema;
+  }
+
+  public static Properties getSchema(
+      org.apache.hadoop.hive.metastore.api.StorageDescriptor sd,
+      org.apache.hadoop.hive.metastore.api.StorageDescriptor tblsd,
+      Map<String, String> parameters, String databaseName, String tableName,
+      List<FieldSchema> partitionKeys) {
+
+    return addCols(getSchemaWithoutCols(sd, parameters, databaseName, 
tableName, partitionKeys), tblsd.getCols());
+  }
+
+  public static String getColumnNameDelimiter(List<FieldSchema> fieldSchemas) {
+    // we first take a look if any fieldSchemas contain COMMA
+    for (int i = 0; i < fieldSchemas.size(); i++) {
+      if (fieldSchemas.get(i).getName().contains(",")) {
+        return String.valueOf(ColumnType.COLUMN_COMMENTS_DELIMITER);
+      }
+    }
+    return String.valueOf(',');
+  }
+
+  /**
+   * Convert FieldSchemas to columnNames.
+   */
+  public static String getColumnNamesFromFieldSchema(List<FieldSchema> 
fieldSchemas) {
+    String delimiter = getColumnNameDelimiter(fieldSchemas);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < fieldSchemas.size(); i++) {
+      if (i > 0) {
+        sb.append(delimiter);
+      }
+      sb.append(fieldSchemas.get(i).getName());
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Convert FieldSchemas to columnTypes.
+   */
+  public static String getColumnTypesFromFieldSchema(
+      List<FieldSchema> fieldSchemas) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < fieldSchemas.size(); i++) {
+      if (i > 0) {
+        sb.append(",");
+      }
+      sb.append(fieldSchemas.get(i).getType());
+    }
+    return sb.toString();
+  }
+
+  public static String getColumnCommentsFromFieldSchema(List<FieldSchema> 
fieldSchemas) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < fieldSchemas.size(); i++) {
+      if (i > 0) {
+        sb.append(ColumnType.COLUMN_COMMENTS_DELIMITER);
+      }
+      sb.append(fieldSchemas.get(i).getComment());
+    }
+    return sb.toString();
+  }
+
+  public static boolean isMaterializedViewTable(Table table) {
+    if (table == null) {
+      return false;
+    }
+    return TableType.MATERIALIZED_VIEW.toString().equals(table.getTableType());
+  }
+
+  public static List<String> getColumnNames(List<FieldSchema> schema) {
+    List<String> cols = new ArrayList<>(schema.size());
+    for (FieldSchema fs : schema) {
+      cols.add(fs.getName());
+    }
+    return cols;
+  }
+
+  public static boolean isValidSchedulingPolicy(String str) {
+    try {
+      parseSchedulingPolicy(str);
+      return true;
+    } catch (IllegalArgumentException ex) {
+    }
+    return false;
+  }
+
+  public static WMPoolSchedulingPolicy parseSchedulingPolicy(String 
schedulingPolicy) {
+    if (schedulingPolicy == null) {
+      return WMPoolSchedulingPolicy.FAIR;
+    }
+    schedulingPolicy = schedulingPolicy.trim().toUpperCase();
+    if ("DEFAULT".equals(schedulingPolicy)) {
+      return WMPoolSchedulingPolicy.FAIR;
+    }
+    return Enum.valueOf(WMPoolSchedulingPolicy.class, schedulingPolicy);
+  }
+
+  private static boolean hasCatalogName(String dbName) {
+    return dbName != null && dbName.length() > 0 &&
+        dbName.charAt(0) == CATALOG_DB_THRIFT_NAME_MARKER;
+  }
+
+  /**
+   * Given a catalog name and database name cram them together into one 
string.  This method can
+   * be used if you do not know the catalog name, in which case the default 
catalog will be
+   * retrieved from the conf object.  The resulting string can be parsed apart 
again via
+   * {@link #parseDbName(String, Configuration)}.
+   * @param catalogName catalog name, can be null if no known.
+   * @param dbName database name, can be null or empty.
+   * @param conf configuration object, used to determine default catalog if 
catalogName is null
+   * @return one string that contains both.
+   */
+  public static String prependCatalogToDbName(@Nullable String catalogName, 
@Nullable String dbName,
+                                              Configuration conf) {
+    if (catalogName == null) catalogName = getDefaultCatalog(conf);
+    StringBuilder buf = new StringBuilder()
+        .append(CATALOG_DB_THRIFT_NAME_MARKER)
+        .append(catalogName)
+        .append(CATALOG_DB_SEPARATOR);
+    if (dbName != null) {
+      if (dbName.isEmpty()) buf.append(DB_EMPTY_MARKER);
+      else buf.append(dbName);
+    }
+    return buf.toString();
+  }
+
+  /**
+   * Given a catalog name and database name, cram them together into one 
string.  These can be
+   * parsed apart again via {@link #parseDbName(String, Configuration)}.
+   * @param catalogName catalog name.  This cannot be null.  If this might be 
null use
+   *                    {@link #prependCatalogToDbName(String, String, 
Configuration)} instead.
+   * @param dbName database name.
+   * @return one string that contains both.
+   */
+  public static String prependNotNullCatToDbName(String catalogName, String 
dbName) {
+    assert catalogName != null;
+    return prependCatalogToDbName(catalogName, dbName, null);
+  }
+
+  /**
+   * Prepend the default 'hive' catalog onto the database name.
+   * @param dbName database name
+   * @param conf configuration object, used to determine default catalog
+   * @return one string with the 'hive' catalog name prepended.
+   */
+  public static String prependCatalogToDbName(String dbName, Configuration 
conf) {
+    return prependCatalogToDbName(null, dbName, conf);
+  }
+
+  private final static String[] nullCatalogAndDatabase = {null, null};
+
+  /**
+   * Parse the catalog name out of the database name.  If no catalog name is 
present then the
+   * default catalog (as set in configuration file) will be assumed.
+   * @param dbName name of the database.  This may or may not contain the 
catalog name.
+   * @param conf configuration object, used to determine the default catalog 
if it is not present
+   *            in the database name.
+   * @return an array of two elements, the first being the catalog name, the 
second the database
+   * name.
+   * @throws MetaException if the name is not either just a database name or a 
catalog plus
+   * database name with the proper delimiters.
+   */
+  public static String[] parseDbName(String dbName, Configuration conf) throws 
MetaException {
+    if (dbName == null) return nullCatalogAndDatabase;
+    if (hasCatalogName(dbName)) {
+      if (dbName.endsWith(CATALOG_DB_SEPARATOR)) {
+        // This means the DB name is null
+        return new String[] {dbName.substring(1, dbName.length() - 1), null};
+      } else if (dbName.endsWith(DB_EMPTY_MARKER)) {
+        // This means the DB name is empty
+        return new String[] {dbName.substring(1, dbName.length() - 
DB_EMPTY_MARKER.length() - 1), ""};
+      }
+      String[] names = dbName.substring(1).split(CATALOG_DB_SEPARATOR, 2);
+      if (names.length != 2) {
+        throw new MetaException(dbName + " is prepended with the catalog 
marker but does not " +
+            "appear to have a catalog name in it");
+      }
+      return names;
+    } else {
+      return new String[] {getDefaultCatalog(conf), dbName};
+    }
+  }
+
+  /**
+   * Position in the array returned by {@link #parseDbName} that has the 
catalog name.
+   */
+  public static final int CAT_NAME = 0;
+  /**
+   * Position in the array returned by {@link #parseDbName} that has the 
database name.
+   */
+  public static final int DB_NAME = 1;
+
+  public static String getDefaultCatalog(Configuration conf) {
+    if (conf == null) {
+      LOG.warn("Configuration is null, so going with default catalog.");
+      return Warehouse.DEFAULT_CATALOG_NAME;
+    }
+    String catName = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.CATALOG_DEFAULT);
+    if (catName == null || "".equals(catName)) catName = 
Warehouse.DEFAULT_CATALOG_NAME;
+    return catName;
+  }
+
+  public static boolean isView(Table table) {
+    if (table == null) {
+      return false;
+    }
+    return TableType.VIRTUAL_VIEW.toString().equals(table.getTableType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
new file mode 100644
index 0000000..bae1ec3
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier;
+import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLSocket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SecurityUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SecurityUtils.class);
+
+  public static UserGroupInformation getUGI() throws LoginException, 
IOException {
+    String doAs = System.getenv("HADOOP_USER_NAME");
+    if (doAs != null && doAs.length() > 0) {
+     /*
+      * this allows doAs (proxy user) to be passed along across process 
boundary where
+      * delegation tokens are not supported.  For example, a DDL stmt via 
WebHCat with
+      * a doAs parameter, forks to 'hcat' which needs to start a Session that
+      * proxies the end user
+      */
+      return UserGroupInformation.createProxyUser(doAs, 
UserGroupInformation.getLoginUser());
+    }
+    return UserGroupInformation.getCurrentUser();
+  }
+  /**
+   * Dynamically sets up the JAAS configuration that uses kerberos
+   * @param principal
+   * @param keyTabFile
+   * @throws IOException
+   */
+  public static void setZookeeperClientKerberosJaasConfig(String principal, 
String keyTabFile) throws IOException {
+    // ZooKeeper property name to pick the correct JAAS conf section
+    final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient";
+    System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, 
SASL_LOGIN_CONTEXT_NAME);
+
+    principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+    JaasConfiguration jaasConf = new 
JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile);
+
+    // Install the Configuration in the runtime.
+    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+  }
+
+  /**
+   * A JAAS configuration for ZooKeeper clients intended to use for SASL
+   * Kerberos.
+   */
+  private static class JaasConfiguration extends 
javax.security.auth.login.Configuration {
+    // Current installed Configuration
+    private static final boolean IBM_JAVA = System.getProperty("java.vendor")
+      .contains("IBM");
+    private final javax.security.auth.login.Configuration baseConfig = 
javax.security.auth.login.Configuration
+        .getConfiguration();
+    private final String loginContextName;
+    private final String principal;
+    private final String keyTabFile;
+
+    public JaasConfiguration(String hiveLoginContextName, String principal, 
String keyTabFile) {
+      this.loginContextName = hiveLoginContextName;
+      this.principal = principal;
+      this.keyTabFile = keyTabFile;
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+      if (loginContextName.equals(appName)) {
+        Map<String, String> krbOptions = new HashMap<String, String>();
+        if (IBM_JAVA) {
+          krbOptions.put("credsType", "both");
+          krbOptions.put("useKeytab", keyTabFile);
+        } else {
+          krbOptions.put("doNotPrompt", "true");
+          krbOptions.put("storeKey", "true");
+          krbOptions.put("useKeyTab", "true");
+          krbOptions.put("keyTab", keyTabFile);
+        }
+  krbOptions.put("principal", principal);
+        krbOptions.put("refreshKrb5Config", "true");
+        AppConfigurationEntry hiveZooKeeperClientEntry = new 
AppConfigurationEntry(
+            KerberosUtil.getKrb5LoginModuleName(), 
LoginModuleControlFlag.REQUIRED, krbOptions);
+        return new AppConfigurationEntry[] { hiveZooKeeperClientEntry };
+      }
+      // Try the base config
+      if (baseConfig != null) {
+        return baseConfig.getAppConfigurationEntry(appName);
+      }
+      return null;
+    }
+  }
+  
+  /**
+   * Get the string form of the token given a token signature. The signature 
is used as the value of
+   * the "service" field in the token for lookup. Ref: 
AbstractDelegationTokenSelector in Hadoop. If
+   * there exists such a token in the token cache (credential store) of the 
job, the lookup returns
+   * that. This is relevant only when running against a "secure" hadoop 
release The method gets hold
+   * of the tokens if they are set up by hadoop - this should happen on the 
map/reduce tasks if the
+   * client added the tokens into hadoop's credential store in the front end 
during job submission.
+   * The method will select the hive delegation token among the set of tokens 
and return the string
+   * form of it
+   * 
+   * @param tokenSignature
+   * @return the string form of the token found
+   * @throws IOException
+   */
+  public static String getTokenStrForm(String tokenSignature) throws 
IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    TokenSelector<? extends TokenIdentifier> tokenSelector = new 
DelegationTokenSelector();
+
+    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+        tokenSignature == null ? new Text() : new Text(tokenSignature), 
ugi.getTokens());
+    return token != null ? token.encodeToUrlString() : null;
+  }
+  
+  /**
+   * Create a delegation token object for the given token string and service. 
Add the token to given
+   * UGI
+   * 
+   * @param ugi
+   * @param tokenStr
+   * @param tokenService
+   * @throws IOException
+   */
+  public static void setTokenStr(UserGroupInformation ugi, String tokenStr, 
String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, 
tokenService);
+    ugi.addToken(delegationToken);
+  }
+
+  /**
+   * Create a new token using the given string and service
+   * 
+   * @param tokenStr
+   * @param tokenService
+   * @return
+   * @throws IOException
+   */
+  private static Token<DelegationTokenIdentifier> createToken(String tokenStr, 
String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = new 
Token<DelegationTokenIdentifier>();
+    delegationToken.decodeFromUrlString(tokenStr);
+    delegationToken.setService(new Text(tokenService));
+    return delegationToken;
+  }
+
+
+  /**
+   * @return the user name set in hadoop.job.ugi param or the current user 
from System
+   * @throws IOException if underlying Hadoop call throws LoginException
+   */
+  public static String getUser() throws IOException {
+    try {
+      UserGroupInformation ugi = getUGI();
+      return ugi.getUserName();
+    } catch (LoginException le) {
+      throw new IOException(le);
+    }
+  }
+
+  public static TServerSocket getServerSocket(String hiveHost, int portNum) 
throws TTransportException {
+    InetSocketAddress serverAddress;
+    if (hiveHost == null || hiveHost.isEmpty()) {
+      // Wildcard bind
+      serverAddress = new InetSocketAddress(portNum);
+    } else {
+      serverAddress = new InetSocketAddress(hiveHost, portNum);
+    }
+    return new TServerSocket(serverAddress);
+  }
+
+  public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, 
String keyStorePath,
+                                                 String keyStorePassWord, 
List<String> sslVersionBlacklist) throws TTransportException,
+      UnknownHostException {
+    TSSLTransportFactory.TSSLTransportParameters params =
+        new TSSLTransportFactory.TSSLTransportParameters();
+    params.setKeyStore(keyStorePath, keyStorePassWord);
+    InetSocketAddress serverAddress;
+    if (hiveHost == null || hiveHost.isEmpty()) {
+      // Wildcard bind
+      serverAddress = new InetSocketAddress(portNum);
+    } else {
+      serverAddress = new InetSocketAddress(hiveHost, portNum);
+    }
+    TServerSocket thriftServerSocket =
+        TSSLTransportFactory.getServerSocket(portNum, 0, 
serverAddress.getAddress(), params);
+    if (thriftServerSocket.getServerSocket() instanceof SSLServerSocket) {
+      List<String> sslVersionBlacklistLocal = new ArrayList<>();
+      for (String sslVersion : sslVersionBlacklist) {
+        sslVersionBlacklistLocal.add(sslVersion.trim().toLowerCase());
+      }
+      SSLServerSocket sslServerSocket = (SSLServerSocket) 
thriftServerSocket.getServerSocket();
+      List<String> enabledProtocols = new ArrayList<>();
+      for (String protocol : sslServerSocket.getEnabledProtocols()) {
+        if (sslVersionBlacklistLocal.contains(protocol.toLowerCase())) {
+          LOG.debug("Disabling SSL Protocol: " + protocol);
+        } else {
+          enabledProtocols.add(protocol);
+        }
+      }
+      sslServerSocket.setEnabledProtocols(enabledProtocols.toArray(new 
String[0]));
+      LOG.info("SSL Server Socket Enabled Protocols: "
+          + Arrays.toString(sslServerSocket.getEnabledProtocols()));
+    }
+    return thriftServerSocket;
+  }
+
+  public static TTransport getSSLSocket(String host, int port, int 
loginTimeout,
+                                        String trustStorePath, String 
trustStorePassWord) throws TTransportException {
+    TSSLTransportFactory.TSSLTransportParameters params =
+        new TSSLTransportFactory.TSSLTransportParameters();
+    params.setTrustStore(trustStorePath, trustStorePassWord);
+    params.requireClientAuth(true);
+    // The underlying SSLSocket object is bound to host:port with the given 
SO_TIMEOUT and
+    // SSLContext created with the given params
+    TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, 
loginTimeout, params);
+    return getSSLSocketWithHttps(tSSLSocket);
+  }
+
+  // Using endpoint identification algorithm as HTTPS enables us to do
+  // CNAMEs/subjectAltName verification
+  private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws 
TTransportException {
+    SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
+    SSLParameters sslParams = sslSocket.getSSLParameters();
+    sslParams.setEndpointIdentificationAlgorithm("HTTPS");
+    sslSocket.setSSLParameters(sslParams);
+    return new TSocket(sslSocket);
+  }
+}

Reply via email to