HIVE-14700 : clean up file/txn information via a metastore thread (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/70299dc4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/70299dc4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/70299dc4 Branch: refs/heads/hive-14535 Commit: 70299dc48f93433fb53611b05f8a719b841575c5 Parents: 8708398 Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Sep 19 15:40:19 2016 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Sep 19 15:40:19 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/ValidWriteIds.java | 40 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 21 +- .../apache/hive/common/util/MockFileSystem.java | 622 +++++++++++++++++++ .../upgrade/derby/037-HIVE-14637.derby.sql | 2 +- .../upgrade/derby/hive-schema-2.2.0.derby.sql | 2 +- .../upgrade/mssql/022-HIVE-14637.mssql.sql | 3 +- .../upgrade/mssql/hive-schema-2.2.0.mssql.sql | 3 +- .../upgrade/mysql/037-HIVE-14637.mysql.sql | 3 +- .../upgrade/mysql/hive-schema-2.2.0.mysql.sql | 3 +- .../upgrade/oracle/037-HIVE-14637.oracle.sql | 1 + .../upgrade/oracle/hive-schema-2.2.0.oracle.sql | 1 + .../postgres/036-HIVE-14637.postgres.sql | 1 + .../postgres/hive-schema-2.2.0.postgres.sql | 1 + .../hadoop/hive/metastore/HiveMetaStore.java | 26 +- .../hadoop/hive/metastore/MetaStoreThread.java | 1 + .../hadoop/hive/metastore/MetaStoreUtils.java | 10 + .../hadoop/hive/metastore/MmCleanerThread.java | 397 ++++++++++++ .../hadoop/hive/metastore/ObjectStore.java | 147 +++-- .../apache/hadoop/hive/metastore/RawStore.java | 26 +- .../hadoop/hive/metastore/hbase/HBaseStore.java | 29 +- .../hive/metastore/model/MTableWrite.java | 12 +- metastore/src/model/package.jdo | 3 + .../DummyRawStoreControlledCommit.java | 25 +- .../DummyRawStoreForJdoConnection.java | 25 +- .../hadoop/hive/metastore/TestObjectStore.java | 177 +++++- .../java/org/apache/hadoop/hive/ql/Driver.java | 3 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 7 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 +- .../hive/ql/txn/compactor/CompactorThread.java | 1 - .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 12 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 549 +--------------- 32 files changed, 1527 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java index b25a72d..b939b43 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public class ValidWriteIds { public static final ValidWriteIds NO_WRITE_IDS = new ValidWriteIds(-1, -1, false, null); - private static final String MM_PREFIX = "mm"; + public static final String MM_PREFIX = "mm"; private final static Logger LOG = LoggerFactory.getLogger(ValidWriteIds.class); @@ -117,22 +117,8 @@ public class ValidWriteIds { } public boolean isValidInput(Path file) { - String fileName = file.getName(); - String[] parts = fileName.split("_", 3); - if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) { - LOG.info("Ignoring unknown file for a MM table: " + file - + " (" + Arrays.toString(parts) + ")"); - return false; - } - long writeId = -1; - try { - writeId = Long.parseLong(parts[1]); - } catch (NumberFormatException ex) { - LOG.info("Ignoring unknown file for a MM table: " + file - + "; parsing " + parts[1] + " got " + ex.getMessage()); - return false; - } - return isValid(writeId); + Long writeId = extractWriteId(file); + return (writeId != null) && isValid(writeId); } public static String getMmFilePrefix(long mmWriteId) { @@ -155,4 +141,24 @@ public class ValidWriteIds { return isMatch == (name.startsWith(prefix) || name.startsWith(tmpPrefix)); } } + + + public static Long extractWriteId(Path file) { + String fileName = file.getName(); + String[] parts = fileName.split("_", 3); + if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) { + LOG.info("Cannot extract write ID for a MM table: " + file + + " (" + Arrays.toString(parts) + ")"); + return null; + } + long writeId = -1; + try { + writeId = Long.parseLong(parts[1]); + } catch (NumberFormatException ex) { + LOG.info("Cannot extract write ID for a MM table: " + file + + "; parsing " + parts[1] + " got " + ex.getMessage()); + return null; + } + return writeId; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 301159e..1a85f50 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -297,7 +297,10 @@ public class HiveConf extends Configuration { HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_MEMORY_TTL, HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY, HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_HBASE_TTL, - HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS + HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS, + HiveConf.ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL, + HiveConf.ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT, + HiveConf.ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT }; /** @@ -3104,6 +3107,22 @@ public class HiveConf extends Configuration { "Log tracing id that can be used by upstream clients for tracking respective logs. " + "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id."), + HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL("hive.metastore.mm.thread.scan.interval", "900s", + new TimeValidator(TimeUnit.SECONDS), + "MM table housekeeping thread interval in this metastore instance. 0 to disable."), + + HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT("hive.metastore.mm.heartbeat.timeout", "1800s", + new TimeValidator(TimeUnit.SECONDS), + "MM write ID times out after this long if a heartbeat is not send. Currently disabled."), // TODO# heartbeating not implemented + + HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT("hive.metastore.mm.absolute.timeout", "7d", + new TimeValidator(TimeUnit.SECONDS), + "MM write ID cannot be outstanding for more than this long."), + + HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD("hive.metastore.mm.aborted.grace.period", "1d", + new TimeValidator(TimeUnit.SECONDS), + "MM write ID will not be removed up for that long after it has been aborted;\n" + + "this is to work around potential races e.g. with FS visibility, when deleting files."), HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role," + http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/test/org/apache/hive/common/util/MockFileSystem.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hive/common/util/MockFileSystem.java b/common/src/test/org/apache/hive/common/util/MockFileSystem.java new file mode 100644 index 0000000..e65fd33 --- /dev/null +++ b/common/src/test/org/apache/hive/common/util/MockFileSystem.java @@ -0,0 +1,622 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.common.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Progressable; + +public class MockFileSystem extends FileSystem { + final List<MockFile> files = new ArrayList<MockFile>(); + final Map<MockFile, FileStatus> fileStatusMap = new HashMap<>(); + Path workingDir = new Path("/"); + // statics for when the mock fs is created via FileSystem.get + private static String blockedUgi = null; + private final static List<MockFile> globalFiles = new ArrayList<MockFile>(); + protected Statistics statistics; + public boolean allowDelete = false; + + public MockFileSystem() { + // empty + } + + @Override + public void initialize(URI uri, Configuration conf) { + setConf(conf); + statistics = getStatistics("mock", getClass()); + } + + public MockFileSystem(Configuration conf, MockFile... files) { + setConf(conf); + this.files.addAll(Arrays.asList(files)); + statistics = getStatistics("mock", getClass()); + } + + public static void setBlockedUgi(String s) { + blockedUgi = s; + } + + public void clear() { + files.clear(); + } + + @Override + public URI getUri() { + try { + return new URI("mock:///"); + } catch (URISyntaxException err) { + throw new IllegalArgumentException("huh?", err); + } + } + + // increments file modification time + public void touch(MockFile file) { + if (fileStatusMap.containsKey(file)) { + FileStatus fileStatus = fileStatusMap.get(file); + FileStatus fileStatusNew = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(), + fileStatus.getReplication(), fileStatus.getBlockSize(), + fileStatus.getModificationTime() + 1, fileStatus.getAccessTime(), + fileStatus.getPermission(), fileStatus.getOwner(), fileStatus.getGroup(), + fileStatus.getPath()); + fileStatusMap.put(file, fileStatusNew); + } + } + + @SuppressWarnings("serial") + public static class MockAccessDenied extends IOException { + } + + @Override + public FSDataInputStream open(Path path, int i) throws IOException { + statistics.incrementReadOps(1); + checkAccess(); + MockFile file = findFile(path); + if (file != null) return new FSDataInputStream(new MockInputStream(file)); + throw new IOException("File not found: " + path); + } + + public MockFile findFile(Path path) { + for (MockFile file: files) { + if (file.path.equals(path)) { + return file; + } + } + for (MockFile file: globalFiles) { + if (file.path.equals(path)) { + return file; + } + } + return null; + } + + private void checkAccess() throws IOException { + if (blockedUgi == null) return; + if (!blockedUgi.equals(UserGroupInformation.getCurrentUser().getShortUserName())) return; + throw new MockAccessDenied(); + } + + @Override + public FSDataOutputStream create(Path path, FsPermission fsPermission, + boolean overwrite, int bufferSize, + short replication, long blockSize, + Progressable progressable + ) throws IOException { + statistics.incrementWriteOps(1); + checkAccess(); + MockFile file = findFile(path); + if (file == null) { + file = new MockFile(path.toString(), (int) blockSize, new byte[0]); + files.add(file); + } + return new MockOutputStream(file); + } + + @Override + public FSDataOutputStream append(Path path, int bufferSize, + Progressable progressable + ) throws IOException { + statistics.incrementWriteOps(1); + checkAccess(); + return create(path, FsPermission.getDefault(), true, bufferSize, + (short) 3, 256 * 1024, progressable); + } + + @Override + public boolean rename(Path path, Path path2) throws IOException { + statistics.incrementWriteOps(1); + checkAccess(); + return false; + } + + @Override + public boolean delete(Path path) throws IOException { + statistics.incrementWriteOps(1); + checkAccess(); + return false; + } + + @Override + public boolean delete(Path path, boolean isRecursive) throws IOException { + statistics.incrementWriteOps(1); + checkAccess(); + return allowDelete && isRecursive && deleteMatchingFiles(files, path.toString()); + } + + @Override + public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f) + throws IOException { + return new RemoteIterator<LocatedFileStatus>() { + private Iterator<LocatedFileStatus> iterator = listLocatedFileStatuses(f).iterator(); + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public LocatedFileStatus next() throws IOException { + return iterator.next(); + } + }; + } + + private List<LocatedFileStatus> listLocatedFileStatuses(Path path) throws IOException { + statistics.incrementReadOps(1); + checkAccess(); + path = path.makeQualified(this); + List<LocatedFileStatus> result = new ArrayList<>(); + String pathname = path.toString(); + String pathnameAsDir = pathname + "/"; + Set<String> dirs = new TreeSet<String>(); + MockFile file = findFile(path); + if (file != null) { + result.add(createLocatedStatus(file)); + return result; + } + findMatchingLocatedFiles(files, pathnameAsDir, dirs, result); + findMatchingLocatedFiles(globalFiles, pathnameAsDir, dirs, result); + // for each directory add it once + for(String dir: dirs) { + result.add(createLocatedDirectory(new MockPath(this, pathnameAsDir + dir))); + } + return result; + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + statistics.incrementReadOps(1); + checkAccess(); + path = path.makeQualified(this); + List<FileStatus> result = new ArrayList<FileStatus>(); + String pathname = path.toString(); + String pathnameAsDir = pathname + "/"; + Set<String> dirs = new TreeSet<String>(); + MockFile file = findFile(path); + if (file != null) { + return new FileStatus[]{createStatus(file)}; + } + findMatchingFiles(files, pathnameAsDir, dirs, result); + findMatchingFiles(globalFiles, pathnameAsDir, dirs, result); + // for each directory add it once + for(String dir: dirs) { + result.add(createDirectory(new MockPath(this, pathnameAsDir + dir))); + } + return result.toArray(new FileStatus[result.size()]); + } + + private void findMatchingFiles( + List<MockFile> files, String pathnameAsDir, Set<String> dirs, List<FileStatus> result) { + for (MockFile file: files) { + String filename = file.path.toString(); + if (filename.startsWith(pathnameAsDir)) { + String tail = filename.substring(pathnameAsDir.length()); + int nextSlash = tail.indexOf('/'); + if (nextSlash > 0) { + dirs.add(tail.substring(0, nextSlash)); + } else { + result.add(createStatus(file)); + } + } + } + } + + private boolean deleteMatchingFiles(List<MockFile> files, String path) { + Iterator<MockFile> fileIter = files.iterator(); + boolean result = true; + while (fileIter.hasNext()) { + MockFile file = fileIter.next(); + String filename = file.path.toString(); + if (!filename.startsWith(path)) continue; + if (filename.length() <= path.length() || filename.charAt(path.length()) != '/') continue; + if (file.cannotDelete) { + result = false; + continue; + } + assert !file.isDeleted; + file.isDeleted = true; + fileIter.remove(); + } + return result; + } + + private void findMatchingLocatedFiles( + List<MockFile> files, String pathnameAsDir, Set<String> dirs, List<LocatedFileStatus> result) + throws IOException { + for (MockFile file: files) { + String filename = file.path.toString(); + if (filename.startsWith(pathnameAsDir)) { + String tail = filename.substring(pathnameAsDir.length()); + int nextSlash = tail.indexOf('/'); + if (nextSlash > 0) { + dirs.add(tail.substring(0, nextSlash)); + } else { + result.add(createLocatedStatus(file)); + } + } + } + } + + @Override + public void setWorkingDirectory(Path path) { + workingDir = path; + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public boolean mkdirs(Path path, FsPermission fsPermission) { + statistics.incrementWriteOps(1); + return false; + } + + private FileStatus createStatus(MockFile file) { + if (fileStatusMap.containsKey(file)) { + return fileStatusMap.get(file); + } + FileStatus fileStatus = new FileStatus(file.length, false, 1, file.blockSize, 0, 0, + FsPermission.createImmutable((short) 644), "owen", "group", + file.path); + fileStatusMap.put(file, fileStatus); + return fileStatus; + } + + private FileStatus createDirectory(Path dir) { + return new FileStatus(0, true, 0, 0, 0, 0, + FsPermission.createImmutable((short) 755), "owen", "group", dir); + } + + private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException { + FileStatus fileStatus = createStatus(file); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + + private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException { + FileStatus fileStatus = createDirectory(dir); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + statistics.incrementReadOps(1); + checkAccess(); + path = path.makeQualified(this); + String pathnameAsDir = path.toString() + "/"; + MockFile file = findFile(path); + if (file != null) return createStatus(file); + for (MockFile dir : files) { + if (dir.path.toString().startsWith(pathnameAsDir)) { + return createDirectory(path); + } + } + for (MockFile dir : globalFiles) { + if (dir.path.toString().startsWith(pathnameAsDir)) { + return createDirectory(path); + } + } + throw new FileNotFoundException("File " + path + " does not exist"); + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus stat, + long start, long len) throws IOException { + return getFileBlockLocationsImpl(stat, start, len, true); + } + + private BlockLocation[] getFileBlockLocationsImpl(final FileStatus stat, final long start, + final long len, + final boolean updateStats) throws IOException { + if (updateStats) { + statistics.incrementReadOps(1); + } + checkAccess(); + List<BlockLocation> result = new ArrayList<BlockLocation>(); + MockFile file = findFile(stat.getPath()); + if (file != null) { + for(MockBlock block: file.blocks) { + if (getOverlap(block.offset, block.length, start, len) > 0) { + String[] topology = new String[block.hosts.length]; + for(int i=0; i < topology.length; ++i) { + topology[i] = "/rack/ " + block.hosts[i]; + } + result.add(new BlockLocation(block.hosts, block.hosts, + topology, block.offset, block.length)); + } + } + return result.toArray(new BlockLocation[result.size()]); + } + return new BlockLocation[0]; + } + + + /** + * Compute the number of bytes that overlap between the two ranges. + * @param offset1 start of range1 + * @param length1 length of range1 + * @param offset2 start of range2 + * @param length2 length of range2 + * @return the number of bytes in the overlap range + */ + private static long getOverlap(long offset1, long length1, long offset2, long length2) { + // c/p from OrcInputFormat + long end1 = offset1 + length1; + long end2 = offset2 + length2; + if (end2 <= offset1 || end1 <= offset2) { + return 0; + } else { + return Math.min(end1, end2) - Math.max(offset1, offset2); + } + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append("mockFs{files:["); + for(int i=0; i < files.size(); ++i) { + if (i != 0) { + buffer.append(", "); + } + buffer.append(files.get(i)); + } + buffer.append("]}"); + return buffer.toString(); + } + + public static void addGlobalFile(MockFile mockFile) { + globalFiles.add(mockFile); + } + + public static void clearGlobalFiles() { + globalFiles.clear(); + } + + + public static class MockBlock { + int offset; + int length; + final String[] hosts; + + public MockBlock(String... hosts) { + this.hosts = hosts; + } + + public void setOffset(int offset) { + this.offset = offset; + } + + public void setLength(int length) { + this.length = length; + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append("block{offset: "); + buffer.append(offset); + buffer.append(", length: "); + buffer.append(length); + buffer.append(", hosts: ["); + for(int i=0; i < hosts.length; i++) { + if (i != 0) { + buffer.append(", "); + } + buffer.append(hosts[i]); + } + buffer.append("]}"); + return buffer.toString(); + } + } + + public static class MockFile { + public final Path path; + public int blockSize; + public int length; + public MockBlock[] blocks; + public byte[] content; + public boolean cannotDelete = false; + // This is purely for testing convenience; has no bearing on FS operations such as list. + public boolean isDeleted = false; + + public MockFile(String path, int blockSize, byte[] content, + MockBlock... blocks) { + this.path = new Path(path); + this.blockSize = blockSize; + this.blocks = blocks; + this.content = content; + this.length = content.length; + int offset = 0; + for(MockBlock block: blocks) { + block.offset = offset; + block.length = Math.min(length - offset, blockSize); + offset += block.length; + } + } + + @Override + public int hashCode() { + return path.hashCode() + 31 * length; + } + + @Override + public boolean equals(final Object obj) { + if (!(obj instanceof MockFile)) { return false; } + return ((MockFile) obj).path.equals(this.path) && ((MockFile) obj).length == this.length; + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append("mockFile{path: "); + buffer.append(path.toString()); + buffer.append(", blkSize: "); + buffer.append(blockSize); + buffer.append(", len: "); + buffer.append(length); + buffer.append(", blocks: ["); + for(int i=0; i < blocks.length; i++) { + if (i != 0) { + buffer.append(", "); + } + buffer.append(blocks[i]); + } + buffer.append("]}"); + return buffer.toString(); + } + } + + static class MockInputStream extends FSInputStream { + final MockFile file; + int offset = 0; + + public MockInputStream(MockFile file) throws IOException { + this.file = file; + } + + @Override + public void seek(long offset) throws IOException { + this.offset = (int) offset; + } + + @Override + public long getPos() throws IOException { + return offset; + } + + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public int read() throws IOException { + if (offset < file.length) { + return file.content[offset++] & 0xff; + } + return -1; + } + } + + public static class MockPath extends Path { + private final FileSystem fs; + public MockPath(FileSystem fs, String path) { + super(path); + this.fs = fs; + } + @Override + public FileSystem getFileSystem(Configuration conf) { + return fs; + } + } + + public static class MockOutputStream extends FSDataOutputStream { + public final MockFile file; + + public MockOutputStream(MockFile file) throws IOException { + super(new DataOutputBuffer(), null); + this.file = file; + } + + /** + * Set the blocks and their location for the file. + * Must be called after the stream is closed or the block length will be + * wrong. + * @param blocks the list of blocks + */ + public void setBlocks(MockBlock... blocks) { + file.blocks = blocks; + int offset = 0; + int i = 0; + while (offset < file.length && i < blocks.length) { + blocks[i].offset = offset; + blocks[i].length = Math.min(file.length - offset, file.blockSize); + offset += blocks[i].length; + i += 1; + } + } + + @Override + public void close() throws IOException { + super.close(); + DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream(); + file.length = buf.getLength(); + file.content = new byte[file.length]; + MockBlock block = new MockBlock("host1"); + block.setLength(file.length); + setBlocks(block); + System.arraycopy(buf.getData(), 0, file.content, 0, file.length); + } + + @Override + public String toString() { + return "Out stream to " + file.toString(); + } + } + + public void addFile(MockFile file) { + files.add(file); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql b/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql index 88a48f0..cb6e5f6 100644 --- a/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql +++ b/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql @@ -1,6 +1,6 @@ ALTER TABLE "TBLS" ADD "MM_WATERMARK_WRITE_ID" BIGINT DEFAULT -1; ALTER TABLE "TBLS" ADD "MM_NEXT_WRITE_ID" BIGINT DEFAULT 0; -CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "LAST_HEARTBEAT" BIGINT); +CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "CREATED" BIGINT NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL); ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY ("TW_ID"); ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; CREATE UNIQUE INDEX "APP"."UNIQUEWRITE" ON "APP"."TBL_WRITES" ("TBL_ID", "WRITE_ID"); http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql index f86ee4a..9da1703 100644 --- a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql @@ -112,7 +112,7 @@ ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY CREATE INDEX "APP"."CONSTRAINTS_PARENT_TBL_ID_INDEX" ON "APP"."KEY_CONSTRAINTS"("PARENT_TBL_ID"); -CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "LAST_HEARTBEAT" BIGINT); +CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "CREATED" BIGINT NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL); ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY ("TW_ID"); http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql b/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql index 5d6f99f..9666d2b 100644 --- a/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql +++ b/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql @@ -7,7 +7,8 @@ CREATE TABLE TBL_WRITES TBL_ID BIGINT NOT NULL, WRITE_ID BIGINT NOT NULL, STATE CHAR(1) NOT NULL, - LAST_HEARTBEAT BIGINT + CREATED BIGINT NOT NULL, + LAST_HEARTBEAT BIGINT NOT NULL ); ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID); ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_FK1 FOREIGN KEY (TBL_ID) REFERENCES TBLS (TBL_ID) ; http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql index 26b2ab3..31016e2 100644 --- a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql +++ b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql @@ -600,7 +600,8 @@ CREATE TABLE TBL_WRITES TBL_ID BIGINT NOT NULL, WRITE_ID BIGINT NOT NULL, STATE CHAR(1) NOT NULL, - LAST_HEARTBEAT BIGINT + CREATED BIGINT NOT NULL, + LAST_HEARTBEAT BIGINT NOT NULL ); ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID); http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql b/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql index c024584..9e34db2 100644 --- a/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql +++ b/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql @@ -7,7 +7,8 @@ CREATE TABLE IF NOT EXISTS `TBL_WRITES` `TBL_ID` BIGINT NOT NULL, `WRITE_ID` BIGINT NOT NULL, `STATE` CHAR(1) NOT NULL, - `LAST_HEARTBEAT` BIGINT, + `CREATED` BIGINT NOT NULL, + `LAST_HEARTBEAT` BIGINT NOT NULL, PRIMARY KEY (`TW_ID`), UNIQUE KEY `UNIQUEWRITE` (`TBL_ID`,`WRITE_ID`), CONSTRAINT `TBL_WRITES_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` (`TBL_ID`) http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql index b295950..3e73008 100644 --- a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql +++ b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql @@ -835,7 +835,8 @@ CREATE TABLE IF NOT EXISTS `TBL_WRITES` `TBL_ID` BIGINT NOT NULL, `WRITE_ID` BIGINT NOT NULL, `STATE` CHAR(1) NOT NULL, - `LAST_HEARTBEAT` BIGINT, + `CREATED` BIGINT NOT NULL, + `LAST_HEARTBEAT` BIGINT NOT NULL, PRIMARY KEY (`TW_ID`), UNIQUE KEY `UNIQUEWRITE` (`TBL_ID`,`WRITE_ID`), CONSTRAINT `TBL_WRITES_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` (`TBL_ID`) http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql b/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql index 9f6dbb2..218eefe 100644 --- a/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql +++ b/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql @@ -7,6 +7,7 @@ CREATE TABLE TBL_WRITES TBL_ID NUMBER NOT NULL, WRITE_ID NUMBER NOT NULL, STATE CHAR(1) NOT NULL, + CREATED NUMBER NOT NULL, LAST_HEARTBEAT NUMBER NOT NULL ); ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID); http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql index 6972c20..5479712 100644 --- a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql +++ b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql @@ -805,6 +805,7 @@ CREATE TABLE TBL_WRITES TBL_ID NUMBER NOT NULL, WRITE_ID NUMBER NOT NULL, STATE CHAR(1) NOT NULL, + CREATED NUMBER NOT NULL, LAST_HEARTBEAT NUMBER NOT NULL ); http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql b/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql index f153837..310f51e 100644 --- a/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql +++ b/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql @@ -8,6 +8,7 @@ CREATE TABLE "TBL_WRITES" "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, + "CREATED" BIGINT NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL ); ALTER TABLE ONLY "TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY ("TW_ID"); http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql index de997d3..bc865ed 100644 --- a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql +++ b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql @@ -614,6 +614,7 @@ CREATE TABLE "TBL_WRITES" "TBL_ID" BIGINT NOT NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, + "CREATED" BIGINT NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL ); http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index aa6d1eb..128e06a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6460,7 +6460,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Thread.sleep(random.nextInt(deadlockRetryBackoffMs)); } - // Do a separate txn after we have reserved the number. TODO: If we fail, ignore on read. + // Do a separate txn after we have reserved the number. boolean ok = false; ms.openTransaction(); try { @@ -6525,11 +6525,18 @@ public class HiveMetaStore extends ThriftHiveMetastore { startFunction("heartbeat_write_id", " : db=" + dbName + " tbl=" + tblName + " writeId=" + writeId); Exception ex = null; + boolean wasAborted = false; try { boolean ok = false; ms.openTransaction(); try { MTableWrite tw = getActiveTableWrite(ms, dbName, tblName, writeId); + long absTimeout = HiveConf.getTimeVar(getConf(), + ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, TimeUnit.MILLISECONDS); + if (tw.getCreated() + absTimeout < System.currentTimeMillis()) { + tw.setState(String.valueOf(MM_WRITE_ABORTED)); + wasAborted = true; + } tw.setLastHeartbeat(System.currentTimeMillis()); ms.updateTableWrite(tw); ok = true; @@ -6542,6 +6549,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } finally { endFunction("heartbeat_write_id", ex == null, ex, tblName); } + if (wasAborted) throw new MetaException("The write was aborted due to absolute timeout"); return new HeartbeatWriteIdResult(); } @@ -6576,10 +6584,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { long watermarkId = tbl.isSetMmWatermarkWriteId() ? tbl.getMmWatermarkWriteId() : -1; if (nextId > (watermarkId + 1)) { // There may be some intermediate failed or active writes; get the valid ones. - List<Long> ids = ms.getWriteIds( + List<Long> ids = ms.getTableWriteIds( dbName, tblName, watermarkId, nextId, MM_WRITE_COMMITTED); // TODO: we could optimize here and send the smaller of the lists, and also use ranges - if (ids != null) { + if (!ids.isEmpty()) { Iterator<Long> iter = ids.iterator(); long oldWatermarkId = watermarkId; while (iter.hasNext()) { @@ -7057,6 +7065,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { startCompactorInitiator(conf); startCompactorWorkers(conf); startCompactorCleaner(conf); + startMmHousekeepingThread(conf); startHouseKeeperService(conf); } catch (Throwable e) { LOG.error("Failure when starting the compactor, compactions may not happen, " + @@ -7096,6 +7105,16 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } + private static void startMmHousekeepingThread(HiveConf conf) throws Exception { + long intervalMs = HiveConf.getTimeVar(conf, + ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL, TimeUnit.MILLISECONDS); + if (intervalMs > 0) { + MetaStoreThread thread = new MmCleanerThread(intervalMs); + initializeAndStartThread(thread, conf); + } + } + + private static MetaStoreThread instantiateThread(String classname) throws Exception { Class<?> c = Class.forName(classname); Object o = c.newInstance(); @@ -7118,6 +7137,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { thread.init(new AtomicBoolean(), new AtomicBoolean()); thread.start(); } + private static void startHouseKeeperService(HiveConf conf) throws Exception { if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) { return; http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java index a0c8d3b..d4d94ff 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java @@ -51,6 +51,7 @@ public interface MetaStoreThread { * thread should then assure that the loop has been gone completely through at * least once. */ + // TODO: move these test parameters to more specific places... there's no need to have them here void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index 41385f7..c2ce259 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1884,4 +1884,14 @@ public class MetaStoreUtils { } csNew.setStatsObj(list); } + + public static boolean isMmTable(Table table) { + return isMmTable(table.getParameters()); + } + + public static boolean isMmTable(Map<String, String> params) { + // TODO: perhaps it should be a 3rd value for 'transactional'? + String value = params.get(hive_metastoreConstants.TABLE_IS_MM); + return value != null && value.equalsIgnoreCase("true"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java new file mode 100644 index 0000000..6a7f588 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIds; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.RawStore.FullTableName; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.model.MTableWrite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Supplier; + +public class MmCleanerThread extends Thread implements MetaStoreThread { + private final static Logger LOG = LoggerFactory.getLogger(MmCleanerThread.class); + private HiveConf conf; + private int threadId; + private AtomicBoolean stop; + private long intervalMs; + private long heartbeatTimeoutMs, absTimeoutMs, abortedGraceMs; + /** Time override for tests. Only used for MM timestamp logic, not for the thread timing. */ + private Supplier<Long> timeOverride = null; + + public MmCleanerThread(long intervalMs) { + this.intervalMs = intervalMs; + } + + @VisibleForTesting + void overrideTime(Supplier<Long> timeOverride) { + this.timeOverride = timeOverride; + } + + private long getTimeMs() { + return timeOverride == null ? System.currentTimeMillis() : timeOverride.get(); + } + + @Override + public void setHiveConf(HiveConf conf) { + this.conf = conf; + heartbeatTimeoutMs = HiveConf.getTimeVar( + conf, ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS); + absTimeoutMs = HiveConf.getTimeVar( + conf, ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, TimeUnit.MILLISECONDS); + abortedGraceMs = HiveConf.getTimeVar( + conf, ConfVars.HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD, TimeUnit.MILLISECONDS); + if (heartbeatTimeoutMs > absTimeoutMs) { + throw new RuntimeException("Heartbeat timeout " + heartbeatTimeoutMs + + " cannot be larger than the absolute timeout " + absTimeoutMs); + } + } + + @Override + public void setThreadId(int threadId) { + this.threadId = threadId; + } + + @Override + public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { + this.stop = stop; + setPriority(MIN_PRIORITY); + setDaemon(true); + } + + @Override + public void run() { + // Only get RS here, when we are already on the thread. + RawStore rs = getRs(); + while (true) { + if (checkStop()) return; + long endTimeNs = System.nanoTime() + intervalMs * 1000000L; + + runOneIteration(rs); + + if (checkStop()) return; + long waitTimeMs = (endTimeNs - System.nanoTime()) / 1000000L; + if (waitTimeMs <= 0) continue; + try { + Thread.sleep(waitTimeMs); + } catch (InterruptedException e) { + LOG.error("Thread was interrupted and will now exit"); + return; + } + } + } + + private RawStore getRs() { + try { + return RawStoreProxy.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId); + } catch (MetaException e) { + LOG.error("Failed to get RawStore; the thread will now die", e); + throw new RuntimeException(e); + } + } + + private boolean checkStop() { + if (!stop.get()) return false; + LOG.info("Stopping due to an external request"); + return true; + } + + @VisibleForTesting + void runOneIteration(RawStore rs) { + // We only get the names here; we want to get and process each table in a separate DB txn. + List<FullTableName> mmTables = null; + try { + mmTables = rs.getAllMmTablesForCleanup(); + } catch (MetaException e) { + LOG.error("Failed to get tables", e); + return; + } + for (FullTableName tableName : mmTables) { + try { + processOneTable(tableName, rs); + } catch (MetaException e) { + LOG.error("Failed to process " + tableName, e); + } + } + } + + private void processOneTable(FullTableName table, RawStore rs) throws MetaException { + // 1. Time out writes that have been running for a while. + // a) Heartbeat timeouts (not enabled right now as heartbeat is not implemented). + // b) Absolute timeouts. + // c) Gaps that have the next ID and the derived absolute timeout. This is a small special + // case that can happen if we increment next ID but fail to insert the write ID record, + // which we do in separate txns to avoid making the conflict-prone increment txn longer. + LOG.info("Processing table " + table); + Table t = rs.getTable(table.dbName, table.tblName); + HashSet<Long> removeWriteIds = new HashSet<>(), cleanupOnlyWriteIds = new HashSet<>(); + getWritesThatReadyForCleanUp(t, table, rs, removeWriteIds, cleanupOnlyWriteIds); + + // 2. Delete the aborted writes' files from the FS. + deleteAbortedWriteIdFiles(table, rs, t, removeWriteIds); + deleteAbortedWriteIdFiles(table, rs, t, cleanupOnlyWriteIds); + // removeWriteIds-s now only contains the writes that were fully cleaned up after. + + // 3. Advance the watermark. + advanceWatermark(table, rs, removeWriteIds); + } + + private void getWritesThatReadyForCleanUp(Table t, FullTableName table, RawStore rs, + HashSet<Long> removeWriteIds, HashSet<Long> cleanupOnlyWriteIds) throws MetaException { + // We will generally ignore errors here. First, we expect some conflicts; second, we will get + // the final view of things after we do (or try, at any rate) all the updates. + long watermarkId = t.isSetMmWatermarkWriteId() ? t.getMmWatermarkWriteId() : -1, + nextWriteId = t.isSetMmNextWriteId() ? t.getMmNextWriteId() : 0; + long now = getTimeMs(), earliestOkHeartbeatMs = now - heartbeatTimeoutMs, + earliestOkCreateMs = now - absTimeoutMs, latestAbortedMs = now - abortedGraceMs; + + List<MTableWrite> writes = rs.getTableWrites( + table.dbName, table.tblName, watermarkId, nextWriteId); + ListIterator<MTableWrite> iter = writes.listIterator(writes.size()); + long expectedId = -1, nextCreated = -1; + // We will go in reverse order and add aborted writes for the gaps that have a following + // write ID that would imply that the previous one (created earlier) would have already + // expired, had it been open and not updated. + while (iter.hasPrevious()) { + MTableWrite write = iter.previous(); + addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, write.getWriteId(), + nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, now); + expectedId = write.getWriteId() - 1; + nextCreated = write.getCreated(); + char state = write.getState().charAt(0); + if (state == HiveMetaStore.MM_WRITE_ABORTED) { + if (write.getLastHeartbeat() < latestAbortedMs) { + removeWriteIds.add(write.getWriteId()); + } else { + cleanupOnlyWriteIds.add(write.getWriteId()); + } + } else if (state == HiveMetaStore.MM_WRITE_OPEN && write.getCreated() < earliestOkCreateMs) { + // TODO: also check for heartbeat here. + if (expireTimedOutWriteId(rs, table.dbName, table.tblName, write.getWriteId(), + now, earliestOkCreateMs, earliestOkHeartbeatMs, cleanupOnlyWriteIds)) { + cleanupOnlyWriteIds.add(write.getWriteId()); + } + } + } + addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, watermarkId, + nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, now); + } + + private void advanceWatermark( + FullTableName table, RawStore rs, HashSet<Long> cleanedUpWriteIds) { + if (!rs.openTransaction()) { + LOG.error("Cannot open transaction"); + return; + } + boolean success = false; + try { + Table t = rs.getTable(table.dbName, table.tblName); + if (t == null) { + return; + } + long watermarkId = t.getMmWatermarkWriteId(); + List<Long> writeIds = rs.getTableWriteIds(table.dbName, table.tblName, watermarkId, + t.getMmNextWriteId(), HiveMetaStore.MM_WRITE_COMMITTED); + long expectedId = watermarkId + 1; + boolean hasGap = false; + Iterator<Long> idIter = writeIds.iterator(); + while (idIter.hasNext()) { + long next = idIter.next(); + if (next < expectedId) continue; + while (next > expectedId) { + if (!cleanedUpWriteIds.contains(expectedId)) { + hasGap = true; + break; + } + ++expectedId; + } + if (hasGap) break; + ++expectedId; + } + // Make sure we also advance over the trailing aborted ones. + if (!hasGap) { + while (cleanedUpWriteIds.contains(expectedId)) { + ++expectedId; + } + } + long newWatermarkId = expectedId - 1; + if (newWatermarkId > watermarkId) { + t.setMmWatermarkWriteId(newWatermarkId); + rs.alterTable(table.dbName, table.tblName, t); + rs.deleteTableWrites(table.dbName, table.tblName, -1, expectedId); + } + success = true; + } catch (Exception ex) { + // TODO: should we try a couple times on conflicts? Aborted writes cannot be unaborted. + LOG.error("Failed to advance watermark", ex); + rs.rollbackTransaction(); + } + if (success) { + tryCommit(rs); + } + } + + private void deleteAbortedWriteIdFiles( + FullTableName table, RawStore rs, Table t, HashSet<Long> cleanUpWriteIds) { + if (cleanUpWriteIds.isEmpty()) return; + if (t.getPartitionKeysSize() > 0) { + for (String location : rs.getAllPartitionLocations(table.dbName, table.tblName)) { + deleteAbortedWriteIdFiles(location, cleanUpWriteIds); + } + } else { + deleteAbortedWriteIdFiles(t.getSd().getLocation(), cleanUpWriteIds); + } + } + + private void deleteAbortedWriteIdFiles(String location, HashSet<Long> abortedWriteIds) { + LOG.info("Looking for " + abortedWriteIds.size() + " aborted write output in " + location); + Path path = new Path(location); + FileSystem fs; + FileStatus[] files; + try { + fs = path.getFileSystem(conf); + if (!fs.exists(path)) { + LOG.warn(path + " does not exist; assuming that the cleanup is not needed."); + return; + } + // TODO# do we need to account for any subdirectories here? decide after special-case jiras + files = fs.listStatus(path); + } catch (Exception ex) { + LOG.error("Failed to get files for " + path + "; cannot ensure cleanup for any writes"); + abortedWriteIds.clear(); + return; + } + for (FileStatus file : files) { + Path childPath = file.getPath(); + if (!file.isDirectory()) { + LOG.warn("Skipping a non-directory file " + childPath); + continue; + } + Long writeId = ValidWriteIds.extractWriteId(childPath); + if (writeId == null) { + LOG.warn("Skipping an unknown directory " + childPath); + continue; + } + if (!abortedWriteIds.contains(writeId.longValue())) continue; + try { + if (!fs.delete(childPath, true)) throw new IOException("delete returned false"); + } catch (Exception ex) { + LOG.error("Couldn't delete " + childPath + "; not cleaning up " + writeId, ex); + abortedWriteIds.remove(writeId.longValue()); + } + } + } + + private boolean expireTimedOutWriteId(RawStore rs, String dbName, + String tblName, long writeId, long now, long earliestOkCreatedMs, + long earliestOkHeartbeatMs, HashSet<Long> cleanupOnlyWriteIds) { + if (!rs.openTransaction()) { + return false; + } + try { + MTableWrite tw = rs.getTableWrite(dbName, tblName, writeId); + if (tw == null) { + // The write have been updated since the time when we thought it has expired. + tryCommit(rs); + return true; + } + char state = tw.getState().charAt(0); + if (state != HiveMetaStore.MM_WRITE_OPEN + || (tw.getCreated() > earliestOkCreatedMs + && tw.getLastHeartbeat() > earliestOkHeartbeatMs)) { + tryCommit(rs); + return true; // The write has been updated since the time when we thought it has expired. + } + tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_ABORTED)); + tw.setLastHeartbeat(now); + rs.updateTableWrite(tw); + } catch (Exception ex) { + LOG.error("Failed to update an expired table write", ex); + rs.rollbackTransaction(); + return false; + } + boolean result = tryCommit(rs); + if (result) { + cleanupOnlyWriteIds.add(writeId); + } + return result; + } + + private boolean tryCommit(RawStore rs) { + try { + return rs.commitTransaction(); + } catch (Exception ex) { + LOG.error("Failed to commit transaction", ex); + return false; + } + } + + private boolean addTimedOutMissingWriteIds(RawStore rs, String dbName, String tblName, + long foundPrevId, long nextCreated, long expectedId, long earliestOkHeartbeatMs, + HashSet<Long> cleanupOnlyWriteIds, long now) throws MetaException { + // Assume all missing ones are created at the same time as the next present write ID. + // We also assume missing writes never had any heartbeats. + if (nextCreated >= earliestOkHeartbeatMs || expectedId < 0) return true; + Table t = null; + List<Long> localCleanupOnlyWriteIds = new ArrayList<>(); + while (foundPrevId < expectedId) { + if (t == null && !rs.openTransaction()) { + LOG.error("Cannot open transaction; skipping"); + return false; + } + try { + if (t == null) { + t = rs.getTable(dbName, tblName); + } + // We don't need to double check if the write exists; the unique index will cause an error. + rs.createTableWrite(t, expectedId, HiveMetaStore.MM_WRITE_ABORTED, now); + } catch (Exception ex) { + // TODO: don't log conflict exceptions?.. although we barely ever expect them. + LOG.error("Failed to create a missing table write", ex); + rs.rollbackTransaction(); + return false; + } + localCleanupOnlyWriteIds.add(expectedId); + --expectedId; + } + boolean result = (t == null || tryCommit(rs)); + if (result) { + cleanupOnlyWriteIds.addAll(localCleanupOnlyWriteIds); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index fb3b1ad..32e4daf 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -711,7 +711,7 @@ public class ObjectStore implements RawStore, Configurable { } public Database getDatabaseInternal(String name) throws MetaException, NoSuchObjectException { - return new GetDbHelper(name, null, true, true) { + return new GetDbHelper(name, true, true) { @Override protected Database getSqlResult(GetHelper<Database> ctx) throws MetaException { return directSql.getDatabase(dbName); @@ -1183,14 +1183,7 @@ public class ObjectStore implements RawStore, Configurable { pm.retrieveAll(result); success = true; } finally { - if (success) { - commitTransaction(); - } else { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + closeTransaction(success, query); } return result; } @@ -2951,15 +2944,13 @@ public class ObjectStore implements RawStore, Configurable { public abstract class GetDbHelper extends GetHelper<Database> { /** * GetHelper for returning db info using directSql/JDO. - * Since this is a db-level call, tblName is ignored, and null is passed irrespective of what is passed in. * @param dbName The Database Name - * @param tblName Placeholder param to match signature, always ignored. * @param allowSql Whether or not we allow DirectSQL to perform this query. * @param allowJdo Whether or not we allow ORM to perform this query. * @throws MetaException */ public GetDbHelper( - String dbName, String tblName, boolean allowSql, boolean allowJdo) throws MetaException { + String dbName,boolean allowSql, boolean allowJdo) throws MetaException { super(dbName,null,allowSql,allowJdo); } @@ -8713,7 +8704,7 @@ public class ObjectStore implements RawStore, Configurable { openTransaction(); try { MTable mtbl = getMTable(tbl.getDbName(), tbl.getTableName()); - MTableWrite tw = new MTableWrite(mtbl, writeId, String.valueOf(state), heartbeat); + MTableWrite tw = new MTableWrite(mtbl, writeId, String.valueOf(state), heartbeat, heartbeat); pm.makePersistent(tw); success = true; } finally { @@ -8746,8 +8737,8 @@ public class ObjectStore implements RawStore, Configurable { String dbName, String tblName, long writeId) throws MetaException { boolean success = false; Query query = null; + openTransaction(); try { - openTransaction(); query = pm.newQuery(MTableWrite.class, "table.tableName == t1 && table.database.name == t2 && writeId == t3"); query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3"); @@ -8762,45 +8753,129 @@ public class ObjectStore implements RawStore, Configurable { } return writes.get(0); } finally { - if (success) { - commitTransaction(); - } else { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + closeTransaction(success, query); } } @Override - public List<Long> getWriteIds(String dbName, String tblName, + public List<Long> getTableWriteIds(String dbName, String tblName, long watermarkId, long nextWriteId, char state) throws MetaException { boolean success = false; Query query = null; + openTransaction(); try { - openTransaction(); + boolean hasState = (state != '\0'); query = pm.newQuery("select writeId from org.apache.hadoop.hive.metastore.model.MTableWrite" - + " where table.tableName == t1 && table.database.name == t2 && writeId >= t3" - + " && writeId < t4 && state == t5"); + + " where table.tableName == t1 && table.database.name == t2 && writeId > t3" + + " && writeId < t4" + (hasState ? " && state == t5" : "")); query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.Long t3, " - + "java.lang.Long t4, java.lang.String t5"); + + "java.lang.Long t4" + (hasState ? ", java.lang.String t5" : "")); query.setResult("writeId"); query.setOrdering("writeId asc"); @SuppressWarnings("unchecked") - List<Long> writes = (List<Long>) query.executeWithArray( - tblName, dbName, watermarkId, nextWriteId, String.valueOf(state)); + List<Long> writes = (List<Long>) (hasState + ? query.executeWithArray(tblName, dbName, watermarkId, nextWriteId, String.valueOf(state)) + : query.executeWithArray(tblName, dbName, watermarkId, nextWriteId)); + success = true; + return (writes == null) ? new ArrayList<Long>() : new ArrayList<>(writes); + } finally { + closeTransaction(success, query); + } + } + + @Override + public List<MTableWrite> getTableWrites( + String dbName, String tblName, long from, long to) throws MetaException { + boolean success = false; + Query query = null; + openTransaction(); + try { + query = pm.newQuery(MTableWrite.class, + "table.tableName == t1 && table.database.name == t2 && writeId > t3 && writeId < t4"); + query.declareParameters( + "java.lang.String t1, java.lang.String t2, java.lang.Long t3, java.lang.Long t4"); + query.setOrdering("writeId asc"); + @SuppressWarnings("unchecked") + List<MTableWrite> writes = + (List<MTableWrite>) query.executeWithArray(tblName, dbName, from, to); success = true; return (writes == null || writes.isEmpty()) ? null : new ArrayList<>(writes); } finally { - if (success) { - commitTransaction(); - } else { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); + closeTransaction(success, query); + } + } + + + @Override + public void deleteTableWrites( + String dbName, String tblName, long from, long to) throws MetaException { + boolean success = false; + Query query = null; + openTransaction(); + try { + query = pm.newQuery(MTableWrite.class, + "table.tableName == t1 && table.database.name == t2 && writeId > t3 && writeId < t4"); + query.declareParameters( + "java.lang.String t1, java.lang.String t2, java.lang.Long t3, java.lang.Long t4"); + query.deletePersistentAll(tblName, dbName, from, to); + success = true; + } finally { + closeTransaction(success, query); + } + } + + @Override + public List<FullTableName > getAllMmTablesForCleanup() throws MetaException { + boolean success = false; + Query query = null; + openTransaction(); + try { + // If the table had no MM writes, there's nothing to clean up + query = pm.newQuery(MTable.class, "mmNextWriteId > 0"); + @SuppressWarnings("unchecked") + List<MTable> tables = (List<MTable>) query.execute(); + pm.retrieveAll(tables); + ArrayList<FullTableName> result = new ArrayList<>(tables.size()); + for (MTable table : tables) { + if (MetaStoreUtils.isMmTable(table.getParameters())) { + result.add(new FullTableName(table.getDatabase().getName(), table.getTableName())); + } } + success = true; + return result; + } finally { + closeTransaction(success, query); + } + } + + @Override + public Collection<String> getAllPartitionLocations(String dbName, String tblName) { + boolean success = false; + Query query = null; + openTransaction(); + try { + String q = "select sd.location from org.apache.hadoop.hive.metastore.model.MPartition" + + " where table.tableName == t1 && table.database.name == t2"; + query = pm.newQuery(); + query.declareParameters("java.lang.String t1, java.lang.String t2"); + @SuppressWarnings("unchecked") + List<String> tables = (List<String>) query.execute(); + pm.retrieveAll(tables); + success = true; + return new ArrayList<>(tables); + } finally { + closeTransaction(success, query); + } + } + + private void closeTransaction(boolean success, Query query) { + if (success) { + commitTransaction(); + } else { + rollbackTransaction(); + } + if (query != null) { + query.closeAll(); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index 170c07d..76ead25 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -23,6 +23,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -697,5 +698,28 @@ public interface RawStore extends Configurable { void createTableWrite(Table tbl, long writeId, char state, long heartbeat); - List<Long> getWriteIds(String dbName, String tblName, long watermarkId, long nextWriteId, char state) throws MetaException; + List<Long> getTableWriteIds(String dbName, String tblName, long watermarkId, long nextWriteId, char state) throws MetaException; + + + public static final class FullTableName { + public final String dbName, tblName; + + public FullTableName(String dbName, String tblName) { + this.dbName = dbName; + this.tblName = tblName; + } + + @Override + public String toString() { + return dbName + "." + tblName; + } + } + + List<FullTableName> getAllMmTablesForCleanup() throws MetaException; + + public List<MTableWrite> getTableWrites(String dbName, String tblName, long from, long to) throws MetaException; + + Collection<String> getAllPartitionLocations(String dbName, String tblName); + + void deleteTableWrites(String dbName, String tblName, long from, long to) throws MetaException; } http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 829f0ae..ddc5a62 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -2759,9 +2759,36 @@ public class HBaseStore implements RawStore { @Override - public List<Long> getWriteIds( + public List<Long> getTableWriteIds( String dbName, String tblName, long watermarkId, long nextWriteId, char state) { // TODO: Auto-generated method stub throw new UnsupportedOperationException(); } + + @Override + public List<FullTableName> getAllMmTablesForCleanup() throws MetaException { + // TODO: Auto-generated method stub + throw new UnsupportedOperationException(); + } + + @Override + public List<MTableWrite> getTableWrites(String dbName, String tblName, + long from, long to) throws MetaException { + // TODO: Auto-generated method stub + throw new UnsupportedOperationException(); + } + + @Override + public Collection<String> getAllPartitionLocations(String dbName, + String tblName) { + // TODO: Auto-generated method stub + throw new UnsupportedOperationException(); + } + + @Override + public void deleteTableWrites(String dbName, String tblName, long from, + long to) throws MetaException { + // TODO: Auto-generated method stub + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java index a7e5f3e..b7f398a 100644 --- a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java +++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java @@ -23,14 +23,16 @@ public class MTableWrite { private long writeId; private String state; private long lastHeartbeat; + private long created; public MTableWrite() {} - public MTableWrite(MTable table, long writeId, String state, long lastHeartbeat) { + public MTableWrite(MTable table, long writeId, String state, long lastHeartbeat, long created) { this.table = table; this.writeId = writeId; this.state = state; this.lastHeartbeat = lastHeartbeat; + this.created = created; } public MTable getTable() { @@ -49,6 +51,10 @@ public class MTableWrite { return lastHeartbeat; } + public long getCreated() { + return created; + } + public void setTable(MTable table) { this.table = table; } @@ -64,4 +70,8 @@ public class MTableWrite { public void setLastHeartbeat(long lastHeartbeat) { this.lastHeartbeat = lastHeartbeat; } + + public void setCreated(long created) { + this.created = created; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/model/package.jdo ---------------------------------------------------------------------- diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo index bd71056..ce101dd 100644 --- a/metastore/src/model/package.jdo +++ b/metastore/src/model/package.jdo @@ -1082,6 +1082,9 @@ <field name="state"> <column name="STATE" length="1" jdbc-type="CHAR" allows-null="false"/> </field> + <field name="created"> + <column name="CREATED" jdbc-type="BIGINT" allows-null="false"/> + </field> <field name="lastHeartbeat"> <column name="LAST_HEARTBEAT" jdbc-type="BIGINT" allows-null="false"/> </field> http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 98c543f..acbbf4e 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -886,8 +887,30 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable { } @Override - public List<Long> getWriteIds( + public List<Long> getTableWriteIds( String dbName, String tblName, long watermarkId, long nextWriteId, char state) { return null; } + + @Override + public List<FullTableName> getAllMmTablesForCleanup() throws MetaException { + return null; + } + + @Override + public List<MTableWrite> getTableWrites(String dbName, String tblName, + long from, long to) throws MetaException { + return null; + } + + @Override + public Collection<String> getAllPartitionLocations(String dbName, + String tblName) { + return null; + } + + @Override + public void deleteTableWrites(String dbName, String tblName, long from, + long to) throws MetaException { + } } http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 8e54b16..787c1f0 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -898,10 +899,32 @@ public class DummyRawStoreForJdoConnection implements RawStore { } @Override - public List<Long> getWriteIds( + public List<Long> getTableWriteIds( String dbName, String tblName, long watermarkId, long nextWriteId, char state) { return null; } + + @Override + public List<FullTableName> getAllMmTablesForCleanup() throws MetaException { + return null; + } + + @Override + public List<MTableWrite> getTableWrites(String dbName, String tblName, + long from, long to) throws MetaException { + return null; + } + + @Override + public Collection<String> getAllPartitionLocations(String dbName, + String tblName) { + return null; + } + + @Override + public void deleteTableWrites(String dbName, String tblName, long from, + long to) throws MetaException { + } } http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java index 0497159..a8d3495 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -17,16 +17,20 @@ */ package org.apache.hadoop.hive.metastore; +import static org.junit.Assert.*; + import java.util.Arrays; import java.util.HashMap; import java.util.List; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -42,9 +46,13 @@ import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.model.MTableWrite; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hive.common.util.MockFileSystem; +import org.apache.hive.common.util.MockFileSystem.MockFile; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -53,6 +61,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Supplier; + public class TestObjectStore { private ObjectStore objectStore = null; @@ -67,6 +77,15 @@ public class TestObjectStore { private static final String ROLE2 = "testobjectstorerole2"; private static final Logger LOG = LoggerFactory.getLogger(TestObjectStore.class.getName()); + private static final class LongSupplier implements Supplier<Long> { + public long value = 0; + + @Override + public Long get() { + return value; + } + } + public static class MockPartitionExpressionProxy implements PartitionExpressionProxy { @Override public String convertExprToFilter(byte[] expr) throws MetaException { @@ -142,7 +161,7 @@ public class TestObjectStore { public void testTableOps() throws MetaException, InvalidObjectException, NoSuchObjectException, InvalidInputException { Database db1 = new Database(DB1, "description", "locationurl", null); objectStore.createDatabase(db1); - StorageDescriptor sd = new StorageDescriptor(null, "location", null, null, false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, null); + StorageDescriptor sd = createFakeSd("location"); HashMap<String,String> params = new HashMap<String,String>(); params.put("EXTERNAL", "false"); Table tbl1 = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd, null, params, "viewOriginalText", "viewExpandedText", "MANAGED_TABLE"); @@ -164,6 +183,156 @@ public class TestObjectStore { objectStore.dropDatabase(DB1); } + + + /** + * Test table operations + */ + @Test + public void testMmCleaner() throws Exception { + HiveConf conf = new HiveConf(); + conf.set(ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT.varname, "3ms"); + conf.set(ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT.varname, "20ms"); + conf.set(ConfVars.HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD.varname, "5ms"); + conf.set("fs.mock.impl", MockFileSystem.class.getName()); + + MockFileSystem mfs = (MockFileSystem)(new Path("mock:///").getFileSystem(conf)); + mfs.clear(); + mfs.allowDelete = true; + // Don't add the files just yet... + MockFile[] files = new MockFile[9]; + for (int i = 0; i < files.length; ++i) { + files[i] = new MockFile("mock:/foo/mm_" + i + "/1", 0, new byte[0]); + } + + LongSupplier time = new LongSupplier(); + + MmCleanerThread mct = new MmCleanerThread(0); + mct.setHiveConf(conf); + mct.overrideTime(time); + + Database db1 = new Database(DB1, "description", "locationurl", null); + objectStore.createDatabase(db1); + StorageDescriptor sd = createFakeSd("mock:/foo"); + HashMap<String,String> params = new HashMap<String,String>(); + params.put("EXTERNAL", "false"); + params.put(hive_metastoreConstants.TABLE_IS_MM, "true"); + Table tbl = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd, + null, params, null, null, "MANAGED_TABLE"); + objectStore.createTable(tbl); + + // Add write #0 so the watermark wouldn't advance; skip write #1, add #2 at 0, skip #3 + createCompleteTableWrite(mfs, files, 0, time, tbl, HiveMetaStore.MM_WRITE_OPEN); + mfs.addFile(files[1]); + createCompleteTableWrite(mfs, files, 2, time, tbl, HiveMetaStore.MM_WRITE_OPEN); + mfs.addFile(files[3]); + tbl.setMmNextWriteId(4); + objectStore.alterTable(DB1, TABLE1, tbl); + + mct.runOneIteration(objectStore); + List<Long> writes = getAbortedWrites(); + assertEquals(0, writes.size()); // Missing write is not aborted before timeout. + time.value = 4; // Advance time. + mct.runOneIteration(objectStore); + writes = getAbortedWrites(); + assertEquals(1, writes.size()); // Missing write is aborted after timeout. + assertEquals(1L, writes.get(0).longValue()); + checkDeletedSet(files, 1); + // However, write #3 was not aborted as we cannot determine when it will time out. + createCompleteTableWrite(mfs, files, 4, time, tbl, HiveMetaStore.MM_WRITE_OPEN); + time.value = 8; + // It will now be aborted, since we have a following write. + mct.runOneIteration(objectStore); + writes = getAbortedWrites(); + assertEquals(2, writes.size()); + assertTrue(writes.contains(Long.valueOf(3))); + checkDeletedSet(files, 1, 3); + + // Commit #0 and #2 and confirm that the watermark advances. + // It will only advance over #1, since #3 was aborted at 8 and grace period has not passed. + time.value = 10; + MTableWrite tw = objectStore.getTableWrite(DB1, TABLE1, 0); + tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_COMMITTED)); + objectStore.updateTableWrite(tw); + tw = objectStore.getTableWrite(DB1, TABLE1, 2); + tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_COMMITTED)); + objectStore.updateTableWrite(tw); + mct.runOneIteration(objectStore); + writes = getAbortedWrites(); + assertEquals(1, writes.size()); + assertEquals(3L, writes.get(0).longValue()); + tbl = objectStore.getTable(DB1, TABLE1); + assertEquals(2L, tbl.getMmWatermarkWriteId()); + + // Now advance the time and see that watermark also advances over #3. + time.value = 16; + mct.runOneIteration(objectStore); + writes = getAbortedWrites(); + assertEquals(0, writes.size()); + tbl = objectStore.getTable(DB1, TABLE1); + assertEquals(3L, tbl.getMmWatermarkWriteId()); + + // Check that the open write gets aborted after some time; then the watermark advances. + time.value = 25; + mct.runOneIteration(objectStore); + writes = getAbortedWrites(); + assertEquals(1, writes.size()); + assertEquals(4L, writes.get(0).longValue()); + time.value = 31; + mct.runOneIteration(objectStore); + tbl = objectStore.getTable(DB1, TABLE1); + assertEquals(4L, tbl.getMmWatermarkWriteId()); + checkDeletedSet(files, 1, 3, 4); // The other two should still be deleted. + + // Finally check that we cannot advance watermark if cleanup fails for some file. + createCompleteTableWrite(mfs, files, 5, time, tbl, HiveMetaStore.MM_WRITE_ABORTED); + createCompleteTableWrite(mfs, files, 6, time, tbl, HiveMetaStore.MM_WRITE_ABORTED); + createCompleteTableWrite(mfs, files, 7, time, tbl, HiveMetaStore.MM_WRITE_COMMITTED); + createCompleteTableWrite(mfs, files, 8, time, tbl, HiveMetaStore.MM_WRITE_ABORTED); + time.value = 37; // Skip the grace period. + files[6].cannotDelete = true; + mct.runOneIteration(objectStore); + checkDeletedSet(files, 1, 3, 4, 5, 8); // The other two should still be deleted. + tbl = objectStore.getTable(DB1, TABLE1); + assertEquals(5L, tbl.getMmWatermarkWriteId()); // Watermark only goes up to 5. + files[6].cannotDelete = false; + mct.runOneIteration(objectStore); + checkDeletedSet(files, 1, 3, 4, 5, 6, 8); + tbl = objectStore.getTable(DB1, TABLE1); + assertEquals(8L, tbl.getMmWatermarkWriteId()); // Now it advances all the way. + + objectStore.dropTable(DB1, TABLE1); + objectStore.dropDatabase(DB1); + } + + private void createCompleteTableWrite(MockFileSystem mfs, MockFile[] files, + int id, LongSupplier time, Table tbl, char state) throws MetaException, InvalidObjectException { + objectStore.createTableWrite(tbl, id, state, time.value); + mfs.addFile(files[id]); + tbl.setMmNextWriteId(id + 1); + objectStore.alterTable(DB1, TABLE1, tbl); + } + + private void checkDeletedSet(MockFile[] files, int... deleted) { + for (int id : deleted) { + assertTrue("File " + id + " not deleted", files[id].isDeleted); + } + int count = 0; + for (MockFile file : files) { + if (file.isDeleted) ++count; + } + assertEquals(deleted.length, count); // Make sure nothing else is deleted. + } + + private List<Long> getAbortedWrites() throws MetaException { + return objectStore.getTableWriteIds(DB1, TABLE1, -1, 10, HiveMetaStore.MM_WRITE_ABORTED); + } + + private StorageDescriptor createFakeSd(String location) { + return new StorageDescriptor(null, location, null, null, false, 0, + new SerDeInfo("SerDeName", "serializationLib", null), null, null, null); + } + /** * Tests partition operations @@ -172,7 +341,7 @@ public class TestObjectStore { public void testPartitionOps() throws MetaException, InvalidObjectException, NoSuchObjectException, InvalidInputException { Database db1 = new Database(DB1, "description", "locationurl", null); objectStore.createDatabase(db1); - StorageDescriptor sd = new StorageDescriptor(null, "location", null, null, false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, null); + StorageDescriptor sd = createFakeSd("location"); HashMap<String,String> tableParams = new HashMap<String,String>(); tableParams.put("EXTERNAL", "false"); FieldSchema partitionKey1 = new FieldSchema("Country", serdeConstants.STRING_TYPE_NAME, ""); @@ -265,7 +434,7 @@ public class TestObjectStore { MetricsFactory.init(conf); CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance(); - objectStore.new GetDbHelper("foo", null, true, true) { + objectStore.new GetDbHelper("foo", true, true) { @Override protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException { return null; @@ -282,7 +451,7 @@ public class TestObjectStore { MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, MetricsConstant.DIRECTSQL_ERRORS, ""); - objectStore.new GetDbHelper("foo", null, true, true) { + objectStore.new GetDbHelper("foo", true, true) { @Override protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) throws MetaException { throw new RuntimeException();