http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java deleted file mode 100644 index ddcda4c..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.MetaException; - -/** - * Define a set of APIs that may vary in different environments - */ -public interface MetaStoreFS { - - /** - * delete a directory - * - * @param f - * @param ifPurge - * @param recursive - * @return true on success - * @throws MetaException - */ - public boolean deleteDir(FileSystem fs, Path f, boolean recursive, - boolean ifPurge, Configuration conf) throws MetaException; - -}
http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java deleted file mode 100644 index 26e2c49..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -public interface MetadataStore { - /** - * @param fileIds file ID list. - * @param result The ref parameter, used to return the serialized file metadata. - */ - void getFileMetadata(List<Long> fileIds, ByteBuffer[] result) throws IOException; - - /** - * @param fileIds file ID list. - * @param metadataBuffers Serialized file metadata, one per file ID. - * @param addedCols The column names for additional columns created by file-format-specific - * metadata handler, to be stored in the cache. - * @param addedVals The values for addedCols; one value per file ID per added column. - */ - void storeFileMetadata(List<Long> fileIds, List<ByteBuffer> metadataBuffers, - ByteBuffer[] addedCols, ByteBuffer[][] addedVals) throws IOException, InterruptedException; - - /** - * @param fileId The file ID. - * @param metadata Serialized file metadata. - * @param addedCols The column names for additional columns created by file-format-specific - * metadata handler, to be stored in the cache. - * @param addedVals The values for addedCols; one value per added column. - */ - void storeFileMetadata(long fileId, ByteBuffer metadata, ByteBuffer[] addedCols, - ByteBuffer[] addedVals) throws IOException, InterruptedException; - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java deleted file mode 100644 index e5d21b0..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; - -import java.util.concurrent.TimeUnit; - -/** - * Any task that will run as a separate thread in the metastore should implement this - * interface. - */ -public interface MetastoreTaskThread extends Configurable, Runnable { - - /** - * Get the frequency at which the thread should be scheduled in the thread pool. You must call - * {@link #setConf(Configuration)} before calling this method. - * @param unit TimeUnit to express the frequency in. - * @return frequency - */ - long runFrequency(TimeUnit unit); -} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java deleted file mode 100644 index 105511d..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import java.util.List; - -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; - -/** - * The proxy interface that metastore uses for variety of QL operations (metastore can't depend - * on QL because QL depends on metastore; creating metastore-client module would be a proper way - * to solve this problem). - */ -public interface PartitionExpressionProxy { - - /** - * Converts serialized Hive expression into filter in the format suitable for Filter.g. - * @param expr Serialized expression. - * @return The filter string. - */ - public String convertExprToFilter(byte[] expr) throws MetaException; - - /** - * Filters the partition names via serialized Hive expression. - * @param partColumns Partition columns in the underlying table. - * @param expr Serialized expression. - * @param defaultPartitionName Default partition name from job or server configuration. - * @param partitionNames Partition names; the list is modified in place. - * @return Whether there were any unknown partitions preserved in the name list. - */ - boolean filterPartitionsByExpr(List<FieldSchema> partColumns, - byte[] expr, String defaultPartitionName, List<String> partitionNames) throws MetaException; - - /** - * Determines the file metadata type from input format of the source table or partition. - * @param inputFormat Input format name. - * @return The file metadata type. - */ - FileMetadataExprType getMetadataType(String inputFormat); - - /** - * Gets a separate proxy that can be used to call file-format-specific methods. - * @param type The file metadata type. - * @return The proxy. - */ - FileFormatProxy getFileFormatProxy(FileMetadataExprType type); - - /** - * Creates SARG from serialized representation. - * @param expr SARG, serialized as Kryo. - * @return SARG. - */ - SearchArgument createSarg(byte[] expr); -} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java deleted file mode 100644 index 893c9f4..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ /dev/null @@ -1,502 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileChecksum; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.utils.FileUtils; -import org.apache.hadoop.hive.metastore.utils.StringUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ReplChangeManager { - private static final Logger LOG = LoggerFactory.getLogger(ReplChangeManager.class); - static private ReplChangeManager instance; - - private static boolean inited = false; - private static boolean enabled = false; - private static Path cmroot; - private static Configuration conf; - private String msUser; - private String msGroup; - - private static final String ORIG_LOC_TAG = "user.original-loc"; - static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; - private static final String URI_FRAGMENT_SEPARATOR = "#"; - public static final String SOURCE_OF_REPLICATION = "repl.source.for"; - private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]"; - - public enum RecycleType { - MOVE, - COPY - } - - public static class FileInfo { - private FileSystem srcFs; - private Path sourcePath; - private Path cmPath; - private String checkSum; - private boolean useSourcePath; - private String subDir; - private boolean copyDone; - - public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) { - this(srcFs, sourcePath, null, null, true, subDir); - } - public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, - String checkSum, boolean useSourcePath, String subDir) { - this.srcFs = srcFs; - this.sourcePath = sourcePath; - this.cmPath = cmPath; - this.checkSum = checkSum; - this.useSourcePath = useSourcePath; - this.subDir = subDir; - this.copyDone = false; - } - public FileSystem getSrcFs() { - return srcFs; - } - public Path getSourcePath() { - return sourcePath; - } - public Path getCmPath() { - return cmPath; - } - public String getCheckSum() { - return checkSum; - } - public boolean isUseSourcePath() { - return useSourcePath; - } - public void setIsUseSourcePath(boolean useSourcePath) { - this.useSourcePath = useSourcePath; - } - public String getSubDir() { - return subDir; - } - public boolean isCopyDone() { - return copyDone; - } - public void setCopyDone(boolean copyDone) { - this.copyDone = copyDone; - } - public Path getEffectivePath() { - if (useSourcePath) { - return sourcePath; - } else { - return cmPath; - } - } - } - - public static synchronized ReplChangeManager getInstance(Configuration conf) - throws MetaException { - if (instance == null) { - instance = new ReplChangeManager(conf); - } - return instance; - } - - private ReplChangeManager(Configuration conf) throws MetaException { - try { - if (!inited) { - if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { - ReplChangeManager.enabled = true; - ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR)); - ReplChangeManager.conf = conf; - - FileSystem cmFs = cmroot.getFileSystem(conf); - // Create cmroot with permission 700 if not exist - if (!cmFs.exists(cmroot)) { - cmFs.mkdirs(cmroot); - cmFs.setPermission(cmroot, new FsPermission("700")); - } - UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); - msUser = usergroupInfo.getShortUserName(); - msGroup = usergroupInfo.getPrimaryGroupName(); - } - inited = true; - } - } catch (IOException e) { - throw new MetaException(StringUtils.stringifyException(e)); - } - } - - // Filter files starts with ".". Note Hadoop consider files starts with - // "." or "_" as hidden file. However, we need to replicate files starts - // with "_". We find at least 2 use cases: - // 1. For har files, _index and _masterindex is required files - // 2. _success file is required for Oozie to indicate availability of data source - private static final PathFilter hiddenFileFilter = new PathFilter(){ - public boolean accept(Path p){ - return !p.getName().startsWith("."); - } - }; - - /*** - * Move a path into cmroot. If the path is a directory (of a partition, or table if nonpartitioned), - * recursively move files inside directory to cmroot. Note the table must be managed table - * @param path a single file or directory - * @param type if the files to be copied or moved to cmpath. - * Copy is costly but preserve the source file - * @param ifPurge if the file should skip Trash when move/delete source file. - * This is referred only if type is MOVE. - * @return int - * @throws IOException - */ - public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException { - if (!enabled) { - return 0; - } - - int count = 0; - FileSystem fs = path.getFileSystem(conf); - if (fs.isDirectory(path)) { - FileStatus[] files = fs.listStatus(path, hiddenFileFilter); - for (FileStatus file : files) { - count += recycle(file.getPath(), type, ifPurge); - } - } else { - String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString()); - - // set timestamp before moving to cmroot, so we can - // avoid race condition CM remove the file before setting - // timestamp - long now = System.currentTimeMillis(); - fs.setTimes(path, now, -1); - - boolean success = false; - if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) { - // If already a file with same checksum exists in cmPath, just ignore the copy/move - // Also, mark the operation is unsuccessful to notify that file with same name already - // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by - // CM cleaner. - success = false; - } else { - switch (type) { - case MOVE: { - LOG.info("Moving {} to {}", path.toString(), cmPath.toString()); - - // Rename fails if the file with same name already exist. - success = fs.rename(path, cmPath); - break; - } - case COPY: { - LOG.info("Copying {} to {}", path.toString(), cmPath.toString()); - - // It is possible to have a file with same checksum in cmPath but the content is - // partially copied or corrupted. In this case, just overwrite the existing file with - // new one. - success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf); - break; - } - default: - // Operation fails as invalid input - break; - } - } - - // Ignore if a file with same content already exist in cmroot - // We might want to setXAttr for the new location in the future - if (success) { - // set the file owner to hive (or the id metastore run as) - fs.setOwner(cmPath, msUser, msGroup); - - // tag the original file name so we know where the file comes from - // Note we currently only track the last known trace as - // xattr has limited capacity. We shall revisit and store all original - // locations if orig-loc becomes important - try { - fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); - } catch (UnsupportedOperationException e) { - LOG.warn("Error setting xattr for {}", path.toString()); - } - - count++; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("A file with the same content of {} already exists, ignore", path.toString()); - } - // Need to extend the tenancy if we saw a newer file with the same content - fs.setTimes(cmPath, now, -1); - } - - // Tag if we want to remain in trash after deletion. - // If multiple files share the same content, then - // any file claim remain in trash would be granted - if ((type == RecycleType.MOVE) && !ifPurge) { - try { - fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 }); - } catch (UnsupportedOperationException e) { - LOG.warn("Error setting xattr for {}", cmPath.toString()); - } - } - } - return count; - } - - // Get checksum of a file - static public String checksumFor(Path path, FileSystem fs) throws IOException { - // TODO: fs checksum only available on hdfs, need to - // find a solution for other fs (eg, local fs, s3, etc) - String checksumString = null; - FileChecksum checksum = fs.getFileChecksum(path); - if (checksum != null) { - checksumString = StringUtils.byteToHexString( - checksum.getBytes(), 0, checksum.getLength()); - } - return checksumString; - } - - /*** - * Convert a path of file inside a partition or table (if non-partitioned) - * to a deterministic location of cmroot. So user can retrieve the file back - * with the original location plus checksum. - * @param conf Hive configuration - * @param name original filename - * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} - * @param cmRootUri CM Root URI. (From remote source if REPL LOAD flow. From local config if recycle.) - * @return Path - */ - static Path getCMPath(Configuration conf, String name, String checkSum, String cmRootUri) { - String newFileName = name + "_" + checkSum; - int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, - DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); - - if (newFileName.length() > maxLength) { - newFileName = newFileName.substring(0, maxLength-1); - } - - return new Path(cmRootUri, newFileName); - } - - /*** - * Get original file specified by src and chksumString. If the file exists and checksum - * matches, return the file; otherwise, use chksumString to retrieve it from cmroot - * @param src Original file location - * @param checksumString Checksum of the original file - * @param srcCMRootURI CM root URI of the source cluster - * @param subDir Sub directory to which the source file belongs to - * @param conf Hive configuration - * @return Corresponding FileInfo object - */ - public static FileInfo getFileInfo(Path src, String checksumString, String srcCMRootURI, String subDir, - Configuration conf) throws MetaException { - try { - FileSystem srcFs = src.getFileSystem(conf); - if (checksumString == null) { - return new FileInfo(srcFs, src, subDir); - } - - Path cmPath = getCMPath(conf, src.getName(), checksumString, srcCMRootURI); - if (!srcFs.exists(src)) { - return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); - } - - String currentChecksumString; - try { - currentChecksumString = checksumFor(src, srcFs); - } catch (IOException ex) { - // If the file is missing or getting modified, then refer CM path - return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); - } - if ((currentChecksumString == null) || checksumString.equals(currentChecksumString)) { - return new FileInfo(srcFs, src, cmPath, checksumString, true, subDir); - } else { - return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); - } - } catch (IOException e) { - throw new MetaException(StringUtils.stringifyException(e)); - } - } - - /*** - * Concatenate filename, checksum, source cmroot uri and subdirectory with "#" - * @param fileUriStr Filename string - * @param fileChecksum Checksum string - * @param encodedSubDir sub directory path into which this file belongs to. Here encoded means, - * the multiple levels of subdirectories are concatenated with path separator "/" - * @return Concatenated Uri string - */ - // TODO: this needs to be enhanced once change management based filesystem is implemented - // Currently using fileuri#checksum#cmrooturi#subdirs as the format - public static String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) - throws IOException { - String encodedUri = fileUriStr; - if ((fileChecksum != null) && (cmroot != null)) { - encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum - + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf); - } else { - encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR; - } - encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + ((encodedSubDir != null) ? encodedSubDir : ""); - if (LOG.isDebugEnabled()) { - LOG.debug("Encoded URI: " + encodedUri); - } - return encodedUri; - } - - /*** - * Split uri with fragment into file uri, subdirs, checksum and source cmroot uri. - * Currently using fileuri#checksum#cmrooturi#subdirs as the format. - * @param fileURIStr uri with fragment - * @return array of file name, subdirs, checksum and source CM root URI - */ - public static String[] decodeFileUri(String fileURIStr) { - String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); - String[] result = new String[4]; - result[0] = uriAndFragment[0]; - if ((uriAndFragment.length > 1) && !StringUtils.isEmpty(uriAndFragment[1])) { - result[1] = uriAndFragment[1]; - } - if ((uriAndFragment.length > 2) && !StringUtils.isEmpty(uriAndFragment[2])) { - result[2] = uriAndFragment[2]; - } - if ((uriAndFragment.length > 3) && !StringUtils.isEmpty(uriAndFragment[3])) { - result[3] = uriAndFragment[3]; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Reading Encoded URI: " + result[0] + ":: " + result[1] + ":: " + result[2] + ":: " + result[3]); - } - return result; - } - - public static boolean isCMFileUri(Path fromPath) { - String[] result = decodeFileUri(fromPath.toString()); - return result[1] != null; - } - - /** - * Thread to clear old files of cmroot recursively - */ - static class CMClearer implements Runnable { - private Path cmroot; - private long secRetain; - private Configuration conf; - - CMClearer(String cmrootString, long secRetain, Configuration conf) { - this.cmroot = new Path(cmrootString); - this.secRetain = secRetain; - this.conf = conf; - } - - @Override - public void run() { - try { - LOG.info("CMClearer started"); - - long now = System.currentTimeMillis(); - FileSystem fs = cmroot.getFileSystem(conf); - FileStatus[] files = fs.listStatus(cmroot); - - for (FileStatus file : files) { - long modifiedTime = file.getModificationTime(); - if (now - modifiedTime > secRetain*1000) { - try { - if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { - boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf); - if (succ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Move " + file.toString() + " to trash"); - } - } else { - LOG.warn("Fail to move " + file.toString() + " to trash"); - } - } else { - boolean succ = fs.delete(file.getPath(), false); - if (succ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Remove " + file.toString()); - } - } else { - LOG.warn("Fail to remove " + file.toString()); - } - } - } catch (UnsupportedOperationException e) { - LOG.warn("Error getting xattr for " + file.getPath().toString()); - } - } - } - } catch (IOException e) { - LOG.error("Exception when clearing cmroot:" + StringUtils.stringifyException(e)); - } - } - } - - // Schedule CMClearer thread. Will be invoked by metastore - static void scheduleCMClearer(Configuration conf) { - if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( - new BasicThreadFactory.Builder() - .namingPattern("cmclearer-%d") - .daemon(true) - .build()); - executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR), - MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), - 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); - } - } - - public static boolean isSourceOfReplication(Database db) { - assert (db != null); - String replPolicyIds = getReplPolicyIdString(db); - return !StringUtils.isEmpty(replPolicyIds); - } - - public static String getReplPolicyIdString(Database db) { - if (db != null) { - Map<String, String> m = db.getParameters(); - if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) { - String replPolicyId = m.get(SOURCE_OF_REPLICATION); - LOG.debug("repl policy for database {} is {}", db.getName(), replPolicyId); - return replPolicyId; - } - LOG.debug("Repl policy is not set for database ", db.getName()); - } - return null; - } - - public static String joinWithSeparator(Iterable<?> strings) { - return org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, strings); - } - - public static String[] getListFromSeparatedString(String commaSeparatedString) { - return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*"); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java deleted file mode 100644 index f97f638..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import java.io.IOException; -import java.lang.annotation.Annotation; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.lang.reflect.UndeclaredThrowableException; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.classification.RetrySemantics; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.utils.JavaUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.metastore.annotation.NoReconnect; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TApplicationException; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.transport.TTransportException; - -import com.google.common.annotations.VisibleForTesting; - -/** - * RetryingMetaStoreClient. Creates a proxy for a IMetaStoreClient - * implementation and retries calls to it on failure. - * If the login user is authenticated using keytab, it relogins user before - * each call. - * - */ -@InterfaceAudience.Public -public class RetryingMetaStoreClient implements InvocationHandler { - - private static final Logger LOG = LoggerFactory.getLogger(RetryingMetaStoreClient.class.getName()); - - private final IMetaStoreClient base; - private final UserGroupInformation ugi; - private final int retryLimit; - private final long retryDelaySeconds; - private final ConcurrentHashMap<String, Long> metaCallTimeMap; - private final long connectionLifeTimeInMillis; - private long lastConnectionTime; - private boolean localMetaStore; - - - protected RetryingMetaStoreClient(Configuration conf, Class<?>[] constructorArgTypes, - Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, - Class<? extends IMetaStoreClient> msClientClass) throws MetaException { - - this.ugi = getUGI(); - - if (this.ugi == null) { - LOG.warn("RetryingMetaStoreClient unable to determine current user UGI."); - } - - this.retryLimit = MetastoreConf.getIntVar(conf, ConfVars.THRIFT_FAILURE_RETRIES); - this.retryDelaySeconds = MetastoreConf.getTimeVar(conf, - ConfVars.CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); - this.metaCallTimeMap = metaCallTimeMap; - this.connectionLifeTimeInMillis = MetastoreConf.getTimeVar(conf, - ConfVars.CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); - this.lastConnectionTime = System.currentTimeMillis(); - String msUri = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); - localMetaStore = (msUri == null) || msUri.trim().isEmpty(); - - reloginExpiringKeytabUser(); - - this.base = JavaUtils.newInstance(msClientClass, constructorArgTypes, constructorArgs); - - LOG.info("RetryingMetaStoreClient proxy=" + msClientClass + " ugi=" + this.ugi - + " retries=" + this.retryLimit + " delay=" + this.retryDelaySeconds - + " lifetime=" + this.connectionLifeTimeInMillis); - } - - public static IMetaStoreClient getProxy( - Configuration hiveConf, boolean allowEmbedded) throws MetaException { - return getProxy(hiveConf, new Class[]{Configuration.class, HiveMetaHookLoader.class, Boolean.class}, - new Object[]{hiveConf, null, allowEmbedded}, null, HiveMetaStoreClient.class.getName() - ); - } - - @VisibleForTesting - public static IMetaStoreClient getProxy(Configuration hiveConf, HiveMetaHookLoader hookLoader, - String mscClassName) throws MetaException { - return getProxy(hiveConf, hookLoader, null, mscClassName, true); - } - - public static IMetaStoreClient getProxy(Configuration hiveConf, HiveMetaHookLoader hookLoader, - ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName, boolean allowEmbedded) - throws MetaException { - - return getProxy(hiveConf, - new Class[] {Configuration.class, HiveMetaHookLoader.class, Boolean.class}, - new Object[] {hiveConf, hookLoader, allowEmbedded}, - metaCallTimeMap, - mscClassName - ); - } - - /** - * This constructor is meant for Hive internal use only. - * Please use getProxy(HiveConf conf, HiveMetaHookLoader hookLoader) for external purpose. - */ - public static IMetaStoreClient getProxy(Configuration hiveConf, Class<?>[] constructorArgTypes, - Object[] constructorArgs, String mscClassName) throws MetaException { - return getProxy(hiveConf, constructorArgTypes, constructorArgs, null, mscClassName); - } - - /** - * This constructor is meant for Hive internal use only. - * Please use getProxy(HiveConf conf, HiveMetaHookLoader hookLoader) for external purpose. - */ - public static IMetaStoreClient getProxy(Configuration hiveConf, Class<?>[] constructorArgTypes, - Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, - String mscClassName) throws MetaException { - - @SuppressWarnings("unchecked") - Class<? extends IMetaStoreClient> baseClass = - JavaUtils.getClass(mscClassName, IMetaStoreClient.class); - - RetryingMetaStoreClient handler = - new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs, - metaCallTimeMap, baseClass); - return (IMetaStoreClient) Proxy.newProxyInstance( - RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler); - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - Object ret; - int retriesMade = 0; - TException caughtException; - - boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class); - boolean allowRetry = true; - Annotation[] directives = method.getDeclaredAnnotations(); - if(directives != null) { - for(Annotation a : directives) { - if(a instanceof RetrySemantics.CannotRetry) { - allowRetry = false; - } - } - } - - while (true) { - try { - reloginExpiringKeytabUser(); - - if (allowReconnect) { - if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { - if (this.ugi != null) { - // Perform reconnect with the proper user context - try { - LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi); - - this.ugi.doAs( - new PrivilegedExceptionAction<Object> () { - @Override - public Object run() throws MetaException { - base.reconnect(); - return null; - } - }); - } catch (UndeclaredThrowableException e) { - Throwable te = e.getCause(); - if (te instanceof PrivilegedActionException) { - throw te.getCause(); - } else { - throw te; - } - } - lastConnectionTime = System.currentTimeMillis(); - } else { - LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information."); - throw new MetaException("UGI information unavailable. Will not attempt a reconnect."); - } - } - } - - if (metaCallTimeMap == null) { - ret = method.invoke(base, args); - } else { - // need to capture the timing - long startTime = System.currentTimeMillis(); - ret = method.invoke(base, args); - long timeTaken = System.currentTimeMillis() - startTime; - addMethodTime(method, timeTaken); - } - break; - } catch (UndeclaredThrowableException e) { - throw e.getCause(); - } catch (InvocationTargetException e) { - Throwable t = e.getCause(); - if (t instanceof TApplicationException) { - TApplicationException tae = (TApplicationException)t; - switch (tae.getType()) { - case TApplicationException.UNSUPPORTED_CLIENT_TYPE: - case TApplicationException.UNKNOWN_METHOD: - case TApplicationException.WRONG_METHOD_NAME: - case TApplicationException.INVALID_PROTOCOL: - throw t; - default: - // TODO: most other options are probably unrecoverable... throw? - caughtException = tae; - } - } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) { - // TODO: most protocol exceptions are probably unrecoverable... throw? - caughtException = (TException)t; - } else if ((t instanceof MetaException) && t.getMessage().matches( - "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") && - !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { - caughtException = (MetaException)t; - } else { - throw t; - } - } catch (MetaException e) { - if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") && - !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { - caughtException = e; - } else { - throw e; - } - } - - - if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) { - throw caughtException; - } - retriesMade++; - LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + retriesMade + " of " + - retryLimit + ") after " + retryDelaySeconds + "s. " + method.getName(), caughtException); - Thread.sleep(retryDelaySeconds * 1000); - } - return ret; - } - - /** - * Returns the UGI for the current user. - * @return the UGI for the current user. - */ - private UserGroupInformation getUGI() { - UserGroupInformation ugi = null; - - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - // Swallow the exception and let the call determine what to do. - } - - return ugi; - } - - private void addMethodTime(Method method, long timeTaken) { - String methodStr = getMethodString(method); - while (true) { - Long curTime = metaCallTimeMap.get(methodStr), newTime = timeTaken; - if (curTime != null && metaCallTimeMap.replace(methodStr, curTime, newTime + curTime)) break; - if (curTime == null && (null == metaCallTimeMap.putIfAbsent(methodStr, newTime))) break; - } - } - - /** - * @param method - * @return String representation with arg types. eg getDatabase_(String, ) - */ - private String getMethodString(Method method) { - StringBuilder methodSb = new StringBuilder(method.getName()); - methodSb.append("_("); - for (Class<?> paramClass : method.getParameterTypes()) { - methodSb.append(paramClass.getSimpleName()); - methodSb.append(", "); - } - methodSb.append(")"); - return methodSb.toString(); - } - - private boolean hasConnectionLifeTimeReached(Method method) { - if (connectionLifeTimeInMillis <= 0 || localMetaStore) { - return false; - } - - boolean shouldReconnect = - (System.currentTimeMillis() - lastConnectionTime) >= connectionLifeTimeInMillis; - if (LOG.isDebugEnabled()) { - LOG.debug("Reconnection status for Method: " + method.getName() + " is " + shouldReconnect); - } - return shouldReconnect; - } - - /** - * Relogin if login user is logged in using keytab - * Relogin is actually done by ugi code only if sufficient time has passed - * A no-op if kerberos security is not enabled - * @throws MetaException - */ - private void reloginExpiringKeytabUser() throws MetaException { - if(!UserGroupInformation.isSecurityEnabled()){ - return; - } - try { - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry - //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x) - if(ugi.isFromKeytab()){ - ugi.checkTGTAndReloginFromKeytab(); - } - } catch (IOException e) { - String msg = "Error doing relogin using keytab " + e.getMessage(); - LOG.error(msg, e); - throw new MetaException(msg); - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java deleted file mode 100644 index 1a17fe3..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.thrift.TException; - -/** - * Use this to get Table objects for a table list. It provides an iterator to - * on the resulting Table objects. It batches the calls to - * IMetaStoreClient.getTableObjectsByName to avoid OOM issues in HS2 (with - * embedded metastore) or MetaStore server (if HS2 is using remote metastore). - * - */ -public class TableIterable implements Iterable<Table> { - - @Override - public Iterator<Table> iterator() { - return new Iterator<Table>() { - - private final Iterator<String> tableNamesIter = tableNames.iterator(); - private Iterator<org.apache.hadoop.hive.metastore.api.Table> batchIter = null; - - @Override - public boolean hasNext() { - return ((batchIter != null) && batchIter.hasNext()) || tableNamesIter.hasNext(); - } - - @Override - public Table next() { - if ((batchIter == null) || !batchIter.hasNext()) { - getNextBatch(); - } - return batchIter.next(); - } - - private void getNextBatch() { - // get next batch of table names in this list - List<String> nameBatch = new ArrayList<String>(); - int batchCounter = 0; - while (batchCounter < batchSize && tableNamesIter.hasNext()) { - nameBatch.add(tableNamesIter.next()); - batchCounter++; - } - // get the Table objects for this batch of table names and get iterator - // on it - - try { - if (catName != null) { - batchIter = msc.getTableObjectsByName(catName, dbname, nameBatch).iterator(); - } else { - batchIter = msc.getTableObjectsByName(dbname, nameBatch).iterator(); - } - } catch (TException e) { - throw new RuntimeException(e); - } - - } - - @Override - public void remove() { - throw new IllegalStateException( - "TableIterable is a read-only iterable and remove() is unsupported"); - } - }; - } - - private final IMetaStoreClient msc; - private final String dbname; - private final List<String> tableNames; - private final int batchSize; - private final String catName; - - /** - * Primary constructor that fetches all tables in a given msc, given a Hive - * object,a db name and a table name list. - */ - public TableIterable(IMetaStoreClient msc, String dbname, List<String> tableNames, int batchSize) - throws TException { - this.msc = msc; - this.catName = null; - this.dbname = dbname; - this.tableNames = tableNames; - this.batchSize = batchSize; - } - - public TableIterable(IMetaStoreClient msc, String catName, String dbname, List<String> - tableNames, int batchSize) throws TException { - this.msc = msc; - this.catName = catName; - this.dbname = dbname; - this.tableNames = tableNames; - this.batchSize = batchSize; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java deleted file mode 100755 index 294dfb7..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ /dev/null @@ -1,759 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.common.TableName; -import org.apache.hadoop.hive.metastore.api.Catalog; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.utils.FileUtils; -import org.apache.hadoop.hive.metastore.utils.HdfsUtils; -import org.apache.hadoop.hive.metastore.utils.JavaUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * This class represents a warehouse where data of Hive tables is stored - */ -public class Warehouse { - public static final String DEFAULT_CATALOG_NAME = "hive"; - public static final String DEFAULT_CATALOG_COMMENT = "Default catalog, for Hive"; - public static final String DEFAULT_DATABASE_NAME = "default"; - public static final String DEFAULT_DATABASE_COMMENT = "Default Hive database"; - public static final String DEFAULT_SERIALIZATION_FORMAT = "1"; - public static final String DATABASE_WAREHOUSE_SUFFIX = ".db"; - private static final String CAT_DB_TABLE_SEPARATOR = "."; - - private Path whRoot; - private Path whRootExternal; - private final Configuration conf; - private final String whRootString; - private final String whRootExternalString; - - public static final Logger LOG = LoggerFactory.getLogger("hive.metastore.warehouse"); - - private MetaStoreFS fsHandler = null; - private boolean storageAuthCheck = false; - private ReplChangeManager cm = null; - - public Warehouse(Configuration conf) throws MetaException { - this.conf = conf; - whRootString = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE); - if (StringUtils.isBlank(whRootString)) { - throw new MetaException(ConfVars.WAREHOUSE.getVarname() - + " is not set in the config or blank"); - } - whRootExternalString = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE_EXTERNAL); - fsHandler = getMetaStoreFsHandler(conf); - cm = ReplChangeManager.getInstance(conf); - storageAuthCheck = MetastoreConf.getBoolVar(conf, ConfVars.AUTHORIZATION_STORAGE_AUTH_CHECKS); - } - - private MetaStoreFS getMetaStoreFsHandler(Configuration conf) - throws MetaException { - String handlerClassStr = MetastoreConf.getVar(conf, ConfVars.FS_HANDLER_CLS); - try { - Class<? extends MetaStoreFS> handlerClass = (Class<? extends MetaStoreFS>) Class - .forName(handlerClassStr, true, JavaUtils.getClassLoader()); - MetaStoreFS handler = ReflectionUtils.newInstance(handlerClass, conf); - return handler; - } catch (ClassNotFoundException e) { - throw new MetaException("Error in loading MetaStoreFS handler." - + e.getMessage()); - } - } - - - /** - * Helper functions to convert IOException to MetaException - */ - public static FileSystem getFs(Path f, Configuration conf) throws MetaException { - try { - return f.getFileSystem(conf); - } catch (IOException e) { - MetaStoreUtils.logAndThrowMetaException(e); - } - return null; - } - - public FileSystem getFs(Path f) throws MetaException { - return getFs(f, conf); - } - - - /** - * Hadoop File System reverse lookups paths with raw ip addresses The File - * System URI always contains the canonical DNS name of the Namenode. - * Subsequently, operations on paths with raw ip addresses cause an exception - * since they don't match the file system URI. - * - * This routine solves this problem by replacing the scheme and authority of a - * path with the scheme and authority of the FileSystem that it maps to. - * - * @param path - * Path to be canonicalized - * @return Path with canonical scheme and authority - */ - public static Path getDnsPath(Path path, Configuration conf) throws MetaException { - FileSystem fs = getFs(path, conf); - String uriPath = path.toUri().getPath(); - if (StringUtils.isEmpty(uriPath)) { - uriPath = "/"; - } - return (new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), uriPath)); - } - - public Path getDnsPath(Path path) throws MetaException { - return getDnsPath(path, conf); - } - - /** - * Resolve the configured warehouse root dir with respect to the configuration - * This involves opening the FileSystem corresponding to the warehouse root - * dir (but that should be ok given that this is only called during DDL - * statements for non-external tables). - */ - public Path getWhRoot() throws MetaException { - if (whRoot != null) { - return whRoot; - } - whRoot = getDnsPath(new Path(whRootString)); - return whRoot; - } - - public Path getWhRootExternal() throws MetaException { - if (whRootExternal != null) { - return whRootExternal; - } - if (!hasExternalWarehouseRoot()) { - whRootExternal = getWhRoot(); - } else { - whRootExternal = getDnsPath(new Path(whRootExternalString)); - } - return whRootExternal; - } - - /** - * Build the database path based on catalog name and database name. This should only be used - * when a database is being created or altered. If you just want to find out the path a - * database is already using call {@link #getDatabasePath(Database)}. If the passed in - * database already has a path set that will be used. If not the location will be built using - * catalog's path and the database name. - * @param cat catalog the database is in - * @param db database object - * @return Path representing the directory for the database - * @throws MetaException when the file path cannot be properly determined from the configured - * file system. - */ - public Path determineDatabasePath(Catalog cat, Database db) throws MetaException { - if (db.isSetLocationUri()) { - return getDnsPath(new Path(db.getLocationUri())); - } - if (cat == null || cat.getName().equalsIgnoreCase(DEFAULT_CATALOG_NAME)) { - if (db.getName().equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { - return getWhRoot(); - } else { - return new Path(getWhRoot(), dbDirFromDbName(db)); - } - } else { - return new Path(getDnsPath(new Path(cat.getLocationUri())), dbDirFromDbName(db)); - } - } - - private String dbDirFromDbName(Database db) throws MetaException { - return db.getName().toLowerCase() + DATABASE_WAREHOUSE_SUFFIX; - } - - /** - * Get the path specified by the database. In the case of the default database the root of the - * warehouse is returned. - * @param db database to get the path of - * @return path to the database directory - * @throws MetaException when the file path cannot be properly determined from the configured - * file system. - */ - public Path getDatabasePath(Database db) throws MetaException { - if (db.getCatalogName().equalsIgnoreCase(DEFAULT_CATALOG_NAME) && - db.getName().equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { - return getWhRoot(); - } - return new Path(db.getLocationUri()); - } - - public Path getDefaultDatabasePath(String dbName) throws MetaException { - // TODO CAT - I am fairly certain that most calls to this are in error. This should only be - // used when the database location is unset, which should never happen except when a - // new database is being created. Once I have confirmation of this change calls of this to - // getDatabasePath(), since it does the right thing. Also, merge this with - // determineDatabasePath() as it duplicates much of the logic. - if (dbName.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { - return getWhRoot(); - } - return new Path(getWhRoot(), dbName.toLowerCase() + DATABASE_WAREHOUSE_SUFFIX); - } - - public Path getDefaultExternalDatabasePath(String dbName) throws MetaException { - if (dbName.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { - return getWhRootExternal(); - } - return new Path(getWhRootExternal(), dbName.toLowerCase() + DATABASE_WAREHOUSE_SUFFIX); - } - - private boolean hasExternalWarehouseRoot() { - return !StringUtils.isBlank(whRootExternalString); - } - - /** - * Returns the default location of the table path using the parent database's location - * @param db Database where the table is created - * @param tableName table name - * @return - * @throws MetaException - */ - @Deprecated - public Path getDefaultTablePath(Database db, String tableName) - throws MetaException { - return getDefaultTablePath(db, tableName, false); - } - - public Path getDefaultTablePath(Database db, String tableName, boolean isExternal) throws MetaException { - Path dbPath = null; - if (isExternal && hasExternalWarehouseRoot()) { - dbPath = getDefaultExternalDatabasePath(db.getName()); - } else { - dbPath = getDatabasePath(db); - } - return getDnsPath( - new Path(dbPath, MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); - } - - // A few situations where we need the default table path, without a DB object - public Path getDefaultTablePath(String dbName, String tableName, boolean isExternal) throws MetaException { - Path dbPath = null; - if (isExternal && hasExternalWarehouseRoot()) { - dbPath = getDefaultExternalDatabasePath(dbName); - } else { - dbPath = getDefaultDatabasePath(dbName); - } - return getDnsPath( - new Path(dbPath, MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); - } - - public Path getDefaultTablePath(Database db, Table table) throws MetaException { - return getDefaultTablePath(db, table.getTableName(), MetaStoreUtils.isExternalTable(table)); - } - - @Deprecated // Use TableName - public static String getQualifiedName(Table table) { - return TableName.getDbTable(table.getDbName(), table.getTableName()); - } - - @Deprecated // Use TableName - public static String getQualifiedName(String dbName, String tableName) { - return TableName.getDbTable(dbName, tableName); - } - - public static String getQualifiedName(Partition partition) { - return partition.getDbName() + "." + partition.getTableName() + partition.getValues(); - } - - /** - * Get table name in cat.db.table format. - * @param table table object - * @return fully qualified name. - */ - public static String getCatalogQualifiedTableName(Table table) { - return TableName.getQualified(table.getCatName(), table.getDbName(), table.getTableName()); - } - - public boolean mkdirs(Path f) throws MetaException { - FileSystem fs; - try { - fs = getFs(f); - return FileUtils.mkdir(fs, f); - } catch (IOException e) { - MetaStoreUtils.logAndThrowMetaException(e); - } - return false; - } - - public boolean renameDir(Path sourcePath, Path destPath, boolean needCmRecycle) throws MetaException { - try { - if (needCmRecycle) { - // Copy the source files to cmroot. As the client will move the source files to another - // location, we should make a copy of the files to cmroot instead of moving it. - cm.recycle(sourcePath, RecycleType.COPY, true); - } - FileSystem srcFs = getFs(sourcePath); - FileSystem destFs = getFs(destPath); - return FileUtils.rename(srcFs, destFs, sourcePath, destPath); - } catch (Exception ex) { - MetaStoreUtils.logAndThrowMetaException(ex); - } - return false; - } - - void addToChangeManagement(Path file) throws MetaException { - try { - cm.recycle(file, RecycleType.COPY, true); - } catch (IOException e) { - throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - } - - public boolean deleteDir(Path f, boolean recursive, Database db) throws MetaException { - return deleteDir(f, recursive, false, db); - } - - public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, Database db) throws MetaException { - return deleteDir(f, recursive, ifPurge, ReplChangeManager.isSourceOfReplication(db)); - } - - public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException { - if (needCmRecycle) { - try { - cm.recycle(f, RecycleType.MOVE, ifPurge); - } catch (IOException e) { - throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - } - FileSystem fs = getFs(f); - return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf); - } - - public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException { - try { - cm.recycle(f, RecycleType.MOVE, ifPurge); - } catch (IOException e) { - throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - } - - public boolean isEmpty(Path path) throws IOException, MetaException { - ContentSummary contents = getFs(path).getContentSummary(path); - if (contents != null && contents.getFileCount() == 0 && contents.getDirectoryCount() == 1) { - return true; - } - return false; - } - - public boolean isWritable(Path path) throws IOException { - if (!storageAuthCheck) { - // no checks for non-secure hadoop installations - return true; - } - if (path == null) { //what??!! - return false; - } - final FileStatus stat; - final FileSystem fs; - try { - fs = getFs(path); - stat = fs.getFileStatus(path); - HdfsUtils.checkFileAccess(fs, stat, FsAction.WRITE); - return true; - } catch (FileNotFoundException fnfe){ - // File named by path doesn't exist; nothing to validate. - return true; - } catch (Exception e) { - // all other exceptions are considered as emanating from - // unauthorized accesses - if (LOG.isDebugEnabled()) { - LOG.debug("Exception when checking if path (" + path + ")", e); - } - return false; - } - } - - private static String escapePathName(String path) { - return FileUtils.escapePathName(path); - } - - private static String unescapePathName(String path) { - return FileUtils.unescapePathName(path); - } - - /** - * Given a partition specification, return the path corresponding to the - * partition spec. By default, the specification does not include dynamic partitions. - * @param spec - * @return string representation of the partition specification. - * @throws MetaException - */ - public static String makePartPath(Map<String, String> spec) - throws MetaException { - return makePartName(spec, true); - } - - /** - * Makes a partition name from a specification - * @param spec - * @param addTrailingSeperator if true, adds a trailing separator e.g. 'ds=1/' - * @return partition name - * @throws MetaException - */ - public static String makePartName(Map<String, String> spec, - boolean addTrailingSeperator) - throws MetaException { - StringBuilder suffixBuf = new StringBuilder(); - int i = 0; - for (Entry<String, String> e : spec.entrySet()) { - if (e.getValue() == null || e.getValue().length() == 0) { - throw new MetaException("Partition spec is incorrect. " + spec); - } - if (i>0) { - suffixBuf.append(Path.SEPARATOR); - } - suffixBuf.append(escapePathName(e.getKey())); - suffixBuf.append('='); - suffixBuf.append(escapePathName(e.getValue())); - i++; - } - if (addTrailingSeperator) { - suffixBuf.append(Path.SEPARATOR); - } - return suffixBuf.toString(); - } - /** - * Given a dynamic partition specification, return the path corresponding to the - * static part of partition specification. This is basically a copy of makePartName - * but we get rid of MetaException since it is not serializable. - * @param spec - * @return string representation of the static part of the partition specification. - */ - public static String makeDynamicPartName(Map<String, String> spec) { - StringBuilder suffixBuf = new StringBuilder(); - for (Entry<String, String> e : spec.entrySet()) { - if (e.getValue() != null && e.getValue().length() > 0) { - suffixBuf.append(escapePathName(e.getKey())); - suffixBuf.append('='); - suffixBuf.append(escapePathName(e.getValue())); - suffixBuf.append(Path.SEPARATOR); - } else { // stop once we see a dynamic partition - break; - } - } - return suffixBuf.toString(); - } - - static final Pattern pat = Pattern.compile("([^/]+)=([^/]+)"); - - private static final Pattern slash = Pattern.compile("/"); - - /** - * Extracts values from partition name without the column names. - * @param name Partition name. - * @param result The result. Must be pre-sized to the expected number of columns. - */ - public static AbstractList<String> makeValsFromName( - String name, AbstractList<String> result) throws MetaException { - assert name != null; - String[] parts = slash.split(name, 0); - if (result == null) { - result = new ArrayList<>(parts.length); - for (int i = 0; i < parts.length; ++i) { - result.add(null); - } - } else if (parts.length != result.size()) { - throw new MetaException( - "Expected " + result.size() + " components, got " + parts.length + " (" + name + ")"); - } - for (int i = 0; i < parts.length; ++i) { - int eq = parts[i].indexOf('='); - if (eq <= 0) { - throw new MetaException("Unexpected component " + parts[i]); - } - result.set(i, unescapePathName(parts[i].substring(eq + 1))); - } - return result; - } - - public static LinkedHashMap<String, String> makeSpecFromName(String name) - throws MetaException { - if (name == null || name.isEmpty()) { - throw new MetaException("Partition name is invalid. " + name); - } - LinkedHashMap<String, String> partSpec = new LinkedHashMap<>(); - makeSpecFromName(partSpec, new Path(name), null); - return partSpec; - } - - public static boolean makeSpecFromName(Map<String, String> partSpec, Path currPath, - Set<String> requiredKeys) { - List<String[]> kvs = new ArrayList<>(); - do { - String component = currPath.getName(); - Matcher m = pat.matcher(component); - if (m.matches()) { - String k = unescapePathName(m.group(1)); - String v = unescapePathName(m.group(2)); - String[] kv = new String[2]; - kv[0] = k; - kv[1] = v; - kvs.add(kv); - } - currPath = currPath.getParent(); - } while (currPath != null && !currPath.getName().isEmpty()); - - // reverse the list since we checked the part from leaf dir to table's base dir - for (int i = kvs.size(); i > 0; i--) { - String key = kvs.get(i - 1)[0]; - if (requiredKeys != null) { - requiredKeys.remove(key); - } - partSpec.put(key, kvs.get(i - 1)[1]); - } - if (requiredKeys == null || requiredKeys.isEmpty()) return true; - LOG.warn("Cannot create partition spec from " + currPath + "; missing keys " + requiredKeys); - return false; - } - - public static Map<String, String> makeEscSpecFromName(String name) throws MetaException { - - if (name == null || name.isEmpty()) { - throw new MetaException("Partition name is invalid. " + name); - } - LinkedHashMap<String, String> partSpec = new LinkedHashMap<>(); - - Path currPath = new Path(name); - - List<String[]> kvs = new ArrayList<>(); - do { - String component = currPath.getName(); - Matcher m = pat.matcher(component); - if (m.matches()) { - String k = m.group(1); - String v = m.group(2); - String[] kv = new String[2]; - kv[0] = k; - kv[1] = v; - kvs.add(kv); - } - currPath = currPath.getParent(); - } while (currPath != null && !currPath.getName().isEmpty()); - - // reverse the list since we checked the part from leaf dir to table's base dir - for (int i = kvs.size(); i > 0; i--) { - partSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]); - } - - return partSpec; - } - - /** - * Returns the default partition path of a table within a given database and partition key value - * pairs. It uses the database location and appends it the table name and the partition key,value - * pairs to create the Path for the partition directory - * - * @param db - parent database which is used to get the base location of the partition directory - * @param tableName - table name for the partitions - * @param pm - Partition key value pairs - * @return - * @throws MetaException - */ - public Path getDefaultPartitionPath(Database db, Table table, - Map<String, String> pm) throws MetaException { - return getPartitionPath(getDefaultTablePath(db, table), pm); - } - - /** - * Returns the path object for the given partition key-value pairs and the base location - * - * @param tblPath - the base location for the partitions. Typically the table location - * @param pm - Partition key value pairs - * @return - * @throws MetaException - */ - public Path getPartitionPath(Path tblPath, Map<String, String> pm) - throws MetaException { - return new Path(tblPath, makePartPath(pm)); - } - - /** - * Given a database, a table and the partition key value pairs this method returns the Path object - * corresponding to the partition key value pairs. It uses the table location if available else - * uses the database location for constructing the path corresponding to the partition key-value - * pairs - * - * @param db - Parent database of the given table - * @param table - Table for which the partition key-values are given - * @param vals - List of values for the partition keys - * @return Path corresponding to the partition key-value pairs - * @throws MetaException - */ - public Path getPartitionPath(Database db, Table table, List<String> vals) - throws MetaException { - List<FieldSchema> partKeys = table.getPartitionKeys(); - if (partKeys == null || (partKeys.size() != vals.size())) { - throw new MetaException("Invalid number of partition keys found for " + table.getTableName()); - } - Map<String, String> pm = new LinkedHashMap<>(vals.size()); - int i = 0; - for (FieldSchema key : partKeys) { - pm.put(key.getName(), vals.get(i)); - i++; - } - - if (table.getSd().getLocation() != null) { - return getPartitionPath(getDnsPath(new Path(table.getSd().getLocation())), pm); - } else { - return getDefaultPartitionPath(db, table, pm); - } - } - - public boolean isDir(Path f) throws MetaException { - FileSystem fs; - try { - fs = getFs(f); - FileStatus fstatus = fs.getFileStatus(f); - if (!fstatus.isDir()) { - return false; - } - } catch (FileNotFoundException e) { - return false; - } catch (IOException e) { - MetaStoreUtils.logAndThrowMetaException(e); - } - return true; - } - - public static String makePartName(List<FieldSchema> partCols, - List<String> vals) throws MetaException { - return makePartName(partCols, vals, null); - } - - /** - * @param desc - * @return array of FileStatus objects corresponding to the files - * making up the passed storage description - */ - public List<FileStatus> getFileStatusesForSD(StorageDescriptor desc) - throws MetaException { - return getFileStatusesForLocation(desc.getLocation()); - } - - /** - * @param location - * @return array of FileStatus objects corresponding to the files - * making up the passed storage description - */ - public List<FileStatus> getFileStatusesForLocation(String location) - throws MetaException { - try { - Path path = new Path(location); - FileSystem fileSys = path.getFileSystem(conf); - return FileUtils.getFileStatusRecurse(path, -1, fileSys); - } catch (IOException ioe) { - MetaStoreUtils.logAndThrowMetaException(ioe); - } - return null; - } - - /** - * @param db database - * @param table table - * @return array of FileStatus objects corresponding to the files making up the passed - * unpartitioned table - */ - public List<FileStatus> getFileStatusesForUnpartitionedTable(Database db, Table table) - throws MetaException { - Path tablePath = getDnsPath(new Path(table.getSd().getLocation())); - try { - FileSystem fileSys = tablePath.getFileSystem(conf); - return FileUtils.getFileStatusRecurse(tablePath, -1, fileSys); - } catch (IOException ioe) { - MetaStoreUtils.logAndThrowMetaException(ioe); - } - return null; - } - - /** - * Makes a valid partition name. - * @param partCols The partition columns - * @param vals The partition values - * @param defaultStr - * The default name given to a partition value if the respective value is empty or null. - * @return An escaped, valid partition name. - * @throws MetaException - */ - public static String makePartName(List<FieldSchema> partCols, - List<String> vals, String defaultStr) throws MetaException { - if ((partCols.size() != vals.size()) || (partCols.size() == 0)) { - String errorStr = "Invalid partition key & values; keys ["; - for (FieldSchema fs : partCols) { - errorStr += (fs.getName() + ", "); - } - errorStr += "], values ["; - for (String val : vals) { - errorStr += (val + ", "); - } - throw new MetaException(errorStr + "]"); - } - List<String> colNames = new ArrayList<>(); - for (FieldSchema col: partCols) { - colNames.add(col.getName()); - } - return FileUtils.makePartName(colNames, vals, defaultStr); - } - - public static List<String> getPartValuesFromPartName(String partName) - throws MetaException { - LinkedHashMap<String, String> partSpec = Warehouse.makeSpecFromName(partName); - List<String> values = new ArrayList<>(); - values.addAll(partSpec.values()); - return values; - } - - public static Map<String, String> makeSpecFromValues(List<FieldSchema> partCols, - List<String> values) { - Map<String, String> spec = new LinkedHashMap<>(); - for (int i = 0; i < values.size(); i++) { - spec.put(partCols.get(i).getName(), values.get(i)); - } - return spec; - } -}