HIVE-14700 : clean up file/txn information via a metastore thread (Sergey 
Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/70299dc4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/70299dc4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/70299dc4

Branch: refs/heads/hive-14535
Commit: 70299dc48f93433fb53611b05f8a719b841575c5
Parents: 8708398
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Mon Sep 19 15:40:19 2016 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Mon Sep 19 15:40:19 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/ValidWriteIds.java       |  40 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  21 +-
 .../apache/hive/common/util/MockFileSystem.java | 622 +++++++++++++++++++
 .../upgrade/derby/037-HIVE-14637.derby.sql      |   2 +-
 .../upgrade/derby/hive-schema-2.2.0.derby.sql   |   2 +-
 .../upgrade/mssql/022-HIVE-14637.mssql.sql      |   3 +-
 .../upgrade/mssql/hive-schema-2.2.0.mssql.sql   |   3 +-
 .../upgrade/mysql/037-HIVE-14637.mysql.sql      |   3 +-
 .../upgrade/mysql/hive-schema-2.2.0.mysql.sql   |   3 +-
 .../upgrade/oracle/037-HIVE-14637.oracle.sql    |   1 +
 .../upgrade/oracle/hive-schema-2.2.0.oracle.sql |   1 +
 .../postgres/036-HIVE-14637.postgres.sql        |   1 +
 .../postgres/hive-schema-2.2.0.postgres.sql     |   1 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |  26 +-
 .../hadoop/hive/metastore/MetaStoreThread.java  |   1 +
 .../hadoop/hive/metastore/MetaStoreUtils.java   |  10 +
 .../hadoop/hive/metastore/MmCleanerThread.java  | 397 ++++++++++++
 .../hadoop/hive/metastore/ObjectStore.java      | 147 +++--
 .../apache/hadoop/hive/metastore/RawStore.java  |  26 +-
 .../hadoop/hive/metastore/hbase/HBaseStore.java |  29 +-
 .../hive/metastore/model/MTableWrite.java       |  12 +-
 metastore/src/model/package.jdo                 |   3 +
 .../DummyRawStoreControlledCommit.java          |  25 +-
 .../DummyRawStoreForJdoConnection.java          |  25 +-
 .../hadoop/hive/metastore/TestObjectStore.java  | 177 +++++-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   3 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   7 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   4 +-
 .../hive/ql/txn/compactor/CompactorThread.java  |   1 -
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  12 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 549 +---------------
 32 files changed, 1527 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java 
b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index b25a72d..b939b43 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 public class ValidWriteIds {
   public static final ValidWriteIds NO_WRITE_IDS = new ValidWriteIds(-1, -1, 
false, null);
 
-  private static final String MM_PREFIX = "mm";
+  public static final String MM_PREFIX = "mm";
 
   private final static Logger LOG = 
LoggerFactory.getLogger(ValidWriteIds.class);
 
@@ -117,22 +117,8 @@ public class ValidWriteIds {
   }
 
   public boolean isValidInput(Path file) {
-    String fileName = file.getName();
-    String[] parts = fileName.split("_", 3);
-    if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) {
-      LOG.info("Ignoring unknown file for a MM table: " + file
-          + " (" + Arrays.toString(parts) + ")");
-      return false;
-    }
-    long writeId = -1;
-    try {
-      writeId = Long.parseLong(parts[1]);
-    } catch (NumberFormatException ex) {
-      LOG.info("Ignoring unknown file for a MM table: " + file
-          + "; parsing " + parts[1] + " got " + ex.getMessage());
-      return false;
-    }
-    return isValid(writeId);
+    Long writeId = extractWriteId(file);
+    return (writeId != null) && isValid(writeId);
   }
 
   public static String getMmFilePrefix(long mmWriteId) {
@@ -155,4 +141,24 @@ public class ValidWriteIds {
       return isMatch == (name.startsWith(prefix) || 
name.startsWith(tmpPrefix));
     }
   }
+
+
+  public static Long extractWriteId(Path file) {
+    String fileName = file.getName();
+    String[] parts = fileName.split("_", 3);
+    if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) {
+      LOG.info("Cannot extract write ID for a MM table: " + file
+          + " (" + Arrays.toString(parts) + ")");
+      return null;
+    }
+    long writeId = -1;
+    try {
+      writeId = Long.parseLong(parts[1]);
+    } catch (NumberFormatException ex) {
+      LOG.info("Cannot extract write ID for a MM table: " + file
+          + "; parsing " + parts[1] + " got " + ex.getMessage());
+      return null;
+    }
+    return writeId;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 301159e..1a85f50 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -297,7 +297,10 @@ public class HiveConf extends Configuration {
       HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_MEMORY_TTL,
       HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY,
       HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_HBASE_TTL,
-      HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS
+      HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS,
+      HiveConf.ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL,
+      HiveConf.ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT,
+      HiveConf.ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT
       };
 
   /**
@@ -3104,6 +3107,22 @@ public class HiveConf extends Configuration {
         "Log tracing id that can be used by upstream clients for tracking 
respective logs. " +
         "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use 
auto-generated session id."),
 
+    
HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL("hive.metastore.mm.thread.scan.interval",
 "900s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "MM table housekeeping thread interval in this metastore instance. 0 
to disable."),
+
+    HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT("hive.metastore.mm.heartbeat.timeout", 
"1800s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "MM write ID times out after this long if a heartbeat is not send. 
Currently disabled."), // TODO# heartbeating not implemented
+
+    HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT("hive.metastore.mm.absolute.timeout", 
"7d",
+        new TimeValidator(TimeUnit.SECONDS),
+        "MM write ID cannot be outstanding for more than this long."),
+
+    
HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD("hive.metastore.mm.aborted.grace.period",
 "1d",
+        new TimeValidator(TimeUnit.SECONDS),
+        "MM write ID will not be removed up for that long after it has been 
aborted;\n" +
+        "this is to work around potential races e.g. with FS visibility, when 
deleting files."),
 
     HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
         
"hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role,"
 +

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/common/src/test/org/apache/hive/common/util/MockFileSystem.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/MockFileSystem.java 
b/common/src/test/org/apache/hive/common/util/MockFileSystem.java
new file mode 100644
index 0000000..e65fd33
--- /dev/null
+++ b/common/src/test/org/apache/hive/common/util/MockFileSystem.java
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+
+public class MockFileSystem extends FileSystem {
+  final List<MockFile> files = new ArrayList<MockFile>();
+  final Map<MockFile, FileStatus> fileStatusMap = new HashMap<>();
+  Path workingDir = new Path("/");
+  // statics for when the mock fs is created via FileSystem.get
+  private static String blockedUgi = null;
+  private final static List<MockFile> globalFiles = new ArrayList<MockFile>();
+  protected Statistics statistics;
+  public boolean allowDelete = false;
+
+  public MockFileSystem() {
+    // empty
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) {
+    setConf(conf);
+    statistics = getStatistics("mock", getClass());
+  }
+
+  public MockFileSystem(Configuration conf, MockFile... files) {
+    setConf(conf);
+    this.files.addAll(Arrays.asList(files));
+    statistics = getStatistics("mock", getClass());
+  }
+
+  public static void setBlockedUgi(String s) {
+    blockedUgi = s;
+  }
+
+  public void clear() {
+    files.clear();
+  }
+
+  @Override
+  public URI getUri() {
+    try {
+      return new URI("mock:///");
+    } catch (URISyntaxException err) {
+      throw new IllegalArgumentException("huh?", err);
+    }
+  }
+
+  // increments file modification time
+  public void touch(MockFile file) {
+    if (fileStatusMap.containsKey(file)) {
+      FileStatus fileStatus = fileStatusMap.get(file);
+      FileStatus fileStatusNew = new FileStatus(fileStatus.getLen(), 
fileStatus.isDirectory(),
+          fileStatus.getReplication(), fileStatus.getBlockSize(),
+          fileStatus.getModificationTime() + 1, fileStatus.getAccessTime(),
+          fileStatus.getPermission(), fileStatus.getOwner(), 
fileStatus.getGroup(),
+          fileStatus.getPath());
+      fileStatusMap.put(file, fileStatusNew);
+    }
+  }
+
+  @SuppressWarnings("serial")
+  public static class MockAccessDenied extends IOException {
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int i) throws IOException {
+    statistics.incrementReadOps(1);
+    checkAccess();
+    MockFile file = findFile(path);
+    if (file != null) return new FSDataInputStream(new MockInputStream(file));
+    throw new IOException("File not found: " + path);
+  }
+
+  public MockFile findFile(Path path) {
+    for (MockFile file: files) {
+      if (file.path.equals(path)) {
+        return file;
+      }
+    }
+    for (MockFile file: globalFiles) {
+      if (file.path.equals(path)) {
+        return file;
+      }
+    }
+    return null;
+  }
+
+  private void checkAccess() throws IOException {
+    if (blockedUgi == null) return;
+    if 
(!blockedUgi.equals(UserGroupInformation.getCurrentUser().getShortUserName())) 
return;
+    throw new MockAccessDenied();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, FsPermission fsPermission,
+                                   boolean overwrite, int bufferSize,
+                                   short replication, long blockSize,
+                                   Progressable progressable
+                                   ) throws IOException {
+    statistics.incrementWriteOps(1);
+    checkAccess();
+    MockFile file = findFile(path);
+    if (file == null) {
+      file = new MockFile(path.toString(), (int) blockSize, new byte[0]);
+      files.add(file);
+    }
+    return new MockOutputStream(file);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path path, int bufferSize,
+                                   Progressable progressable
+                                   ) throws IOException {
+    statistics.incrementWriteOps(1);
+    checkAccess();
+    return create(path, FsPermission.getDefault(), true, bufferSize,
+        (short) 3, 256 * 1024, progressable);
+  }
+
+  @Override
+  public boolean rename(Path path, Path path2) throws IOException {
+    statistics.incrementWriteOps(1);
+    checkAccess();
+    return false;
+  }
+
+  @Override
+  public boolean delete(Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    checkAccess();
+    return false;
+  }
+
+  @Override
+  public boolean delete(Path path, boolean isRecursive) throws IOException {
+    statistics.incrementWriteOps(1);
+    checkAccess();
+    return allowDelete && isRecursive && deleteMatchingFiles(files, 
path.toString());
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
+      throws IOException {
+    return new RemoteIterator<LocatedFileStatus>() {
+      private Iterator<LocatedFileStatus> iterator = 
listLocatedFileStatuses(f).iterator();
+
+      @Override
+      public boolean hasNext() throws IOException {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public LocatedFileStatus next() throws IOException {
+        return iterator.next();
+      }
+    };
+  }
+
+  private List<LocatedFileStatus> listLocatedFileStatuses(Path path) throws 
IOException {
+    statistics.incrementReadOps(1);
+    checkAccess();
+    path = path.makeQualified(this);
+    List<LocatedFileStatus> result = new ArrayList<>();
+    String pathname = path.toString();
+    String pathnameAsDir = pathname + "/";
+    Set<String> dirs = new TreeSet<String>();
+    MockFile file = findFile(path);
+    if (file != null) {
+      result.add(createLocatedStatus(file));
+      return result;
+    }
+    findMatchingLocatedFiles(files, pathnameAsDir, dirs, result);
+    findMatchingLocatedFiles(globalFiles, pathnameAsDir, dirs, result);
+    // for each directory add it once
+    for(String dir: dirs) {
+      result.add(createLocatedDirectory(new MockPath(this, pathnameAsDir + 
dir)));
+    }
+    return result;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws IOException {
+    statistics.incrementReadOps(1);
+    checkAccess();
+    path = path.makeQualified(this);
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    String pathname = path.toString();
+    String pathnameAsDir = pathname + "/";
+    Set<String> dirs = new TreeSet<String>();
+    MockFile file = findFile(path);
+    if (file != null) {
+      return new FileStatus[]{createStatus(file)};
+    }
+    findMatchingFiles(files, pathnameAsDir, dirs, result);
+    findMatchingFiles(globalFiles, pathnameAsDir, dirs, result);
+    // for each directory add it once
+    for(String dir: dirs) {
+      result.add(createDirectory(new MockPath(this, pathnameAsDir + dir)));
+    }
+    return result.toArray(new FileStatus[result.size()]);
+  }
+
+  private void findMatchingFiles(
+      List<MockFile> files, String pathnameAsDir, Set<String> dirs, 
List<FileStatus> result) {
+    for (MockFile file: files) {
+      String filename = file.path.toString();
+      if (filename.startsWith(pathnameAsDir)) {
+        String tail = filename.substring(pathnameAsDir.length());
+        int nextSlash = tail.indexOf('/');
+        if (nextSlash > 0) {
+          dirs.add(tail.substring(0, nextSlash));
+        } else {
+          result.add(createStatus(file));
+        }
+      }
+    }
+  }
+
+  private boolean deleteMatchingFiles(List<MockFile> files, String path) {
+    Iterator<MockFile> fileIter = files.iterator();
+    boolean result = true;
+    while (fileIter.hasNext()) {
+      MockFile file = fileIter.next();
+      String filename = file.path.toString();
+      if (!filename.startsWith(path)) continue;
+      if (filename.length() <= path.length() || filename.charAt(path.length()) 
!= '/') continue;
+      if (file.cannotDelete) {
+        result = false;
+        continue;
+      }
+      assert !file.isDeleted;
+      file.isDeleted = true;
+      fileIter.remove();
+    }
+    return result;
+  }
+
+  private void findMatchingLocatedFiles(
+      List<MockFile> files, String pathnameAsDir, Set<String> dirs, 
List<LocatedFileStatus> result)
+      throws IOException {
+    for (MockFile file: files) {
+      String filename = file.path.toString();
+      if (filename.startsWith(pathnameAsDir)) {
+        String tail = filename.substring(pathnameAsDir.length());
+        int nextSlash = tail.indexOf('/');
+        if (nextSlash > 0) {
+          dirs.add(tail.substring(0, nextSlash));
+        } else {
+          result.add(createLocatedStatus(file));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void setWorkingDirectory(Path path) {
+    workingDir = path;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public boolean mkdirs(Path path, FsPermission fsPermission) {
+    statistics.incrementWriteOps(1);
+    return false;
+  }
+
+  private FileStatus createStatus(MockFile file) {
+    if (fileStatusMap.containsKey(file)) {
+      return fileStatusMap.get(file);
+    }
+    FileStatus fileStatus = new FileStatus(file.length, false, 1, 
file.blockSize, 0, 0,
+        FsPermission.createImmutable((short) 644), "owen", "group",
+        file.path);
+    fileStatusMap.put(file, fileStatus);
+    return fileStatus;
+  }
+
+  private FileStatus createDirectory(Path dir) {
+    return new FileStatus(0, true, 0, 0, 0, 0,
+        FsPermission.createImmutable((short) 755), "owen", "group", dir);
+  }
+
+  private LocatedFileStatus createLocatedStatus(MockFile file) throws 
IOException {
+    FileStatus fileStatus = createStatus(file);
+    return new LocatedFileStatus(fileStatus,
+        getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+  }
+
+  private LocatedFileStatus createLocatedDirectory(Path dir) throws 
IOException {
+    FileStatus fileStatus = createDirectory(dir);
+    return new LocatedFileStatus(fileStatus,
+        getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    statistics.incrementReadOps(1);
+    checkAccess();
+    path = path.makeQualified(this);
+    String pathnameAsDir = path.toString() + "/";
+    MockFile file = findFile(path);
+    if (file != null) return createStatus(file);
+    for (MockFile dir : files) {
+      if (dir.path.toString().startsWith(pathnameAsDir)) {
+        return createDirectory(path);
+      }
+    }
+    for (MockFile dir : globalFiles) {
+      if (dir.path.toString().startsWith(pathnameAsDir)) {
+        return createDirectory(path);
+      }
+    }
+    throw new FileNotFoundException("File " + path + " does not exist");
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(FileStatus stat,
+                                               long start, long len) throws 
IOException {
+    return getFileBlockLocationsImpl(stat, start, len, true);
+  }
+
+  private BlockLocation[] getFileBlockLocationsImpl(final FileStatus stat, 
final long start,
+      final long len,
+      final boolean updateStats) throws IOException {
+    if (updateStats) {
+      statistics.incrementReadOps(1);
+    }
+    checkAccess();
+    List<BlockLocation> result = new ArrayList<BlockLocation>();
+    MockFile file = findFile(stat.getPath());
+    if (file != null) {
+      for(MockBlock block: file.blocks) {
+        if (getOverlap(block.offset, block.length, start, len) > 0) {
+          String[] topology = new String[block.hosts.length];
+          for(int i=0; i < topology.length; ++i) {
+            topology[i] = "/rack/ " + block.hosts[i];
+          }
+          result.add(new BlockLocation(block.hosts, block.hosts,
+              topology, block.offset, block.length));
+        }
+      }
+      return result.toArray(new BlockLocation[result.size()]);
+    }
+    return new BlockLocation[0];
+  }
+  
+
+  /**
+   * Compute the number of bytes that overlap between the two ranges.
+   * @param offset1 start of range1
+   * @param length1 length of range1
+   * @param offset2 start of range2
+   * @param length2 length of range2
+   * @return the number of bytes in the overlap range
+   */
+  private static long getOverlap(long offset1, long length1, long offset2, 
long length2) {
+    // c/p from OrcInputFormat
+    long end1 = offset1 + length1;
+    long end2 = offset2 + length2;
+    if (end2 <= offset1 || end1 <= offset2) {
+      return 0;
+    } else {
+      return Math.min(end1, end2) - Math.max(offset1, offset2);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("mockFs{files:[");
+    for(int i=0; i < files.size(); ++i) {
+      if (i != 0) {
+        buffer.append(", ");
+      }
+      buffer.append(files.get(i));
+    }
+    buffer.append("]}");
+    return buffer.toString();
+  }
+
+  public static void addGlobalFile(MockFile mockFile) {
+    globalFiles.add(mockFile);
+  }
+
+  public static void clearGlobalFiles() {
+    globalFiles.clear();
+  }
+
+
+  public static class MockBlock {
+    int offset;
+    int length;
+    final String[] hosts;
+
+    public MockBlock(String... hosts) {
+      this.hosts = hosts;
+    }
+
+    public void setOffset(int offset) {
+      this.offset = offset;
+    }
+
+    public void setLength(int length) {
+      this.length = length;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buffer = new StringBuilder();
+      buffer.append("block{offset: ");
+      buffer.append(offset);
+      buffer.append(", length: ");
+      buffer.append(length);
+      buffer.append(", hosts: [");
+      for(int i=0; i < hosts.length; i++) {
+        if (i != 0) {
+          buffer.append(", ");
+        }
+        buffer.append(hosts[i]);
+      }
+      buffer.append("]}");
+      return buffer.toString();
+    }
+  }
+
+  public static class MockFile {
+    public final Path path;
+    public int blockSize;
+    public int length;
+    public MockBlock[] blocks;
+    public byte[] content;
+    public boolean cannotDelete = false;
+    // This is purely for testing convenience; has no bearing on FS operations 
such as list.
+    public boolean isDeleted = false;
+
+    public MockFile(String path, int blockSize, byte[] content,
+                    MockBlock... blocks) {
+      this.path = new Path(path);
+      this.blockSize = blockSize;
+      this.blocks = blocks;
+      this.content = content;
+      this.length = content.length;
+      int offset = 0;
+      for(MockBlock block: blocks) {
+        block.offset = offset;
+        block.length = Math.min(length - offset, blockSize);
+        offset += block.length;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return path.hashCode() + 31 * length;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+      if (!(obj instanceof MockFile)) { return false; }
+      return ((MockFile) obj).path.equals(this.path) && ((MockFile) 
obj).length == this.length;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buffer = new StringBuilder();
+      buffer.append("mockFile{path: ");
+      buffer.append(path.toString());
+      buffer.append(", blkSize: ");
+      buffer.append(blockSize);
+      buffer.append(", len: ");
+      buffer.append(length);
+      buffer.append(", blocks: [");
+      for(int i=0; i < blocks.length; i++) {
+        if (i != 0) {
+          buffer.append(", ");
+        }
+        buffer.append(blocks[i]);
+      }
+      buffer.append("]}");
+      return buffer.toString();
+    }
+  }
+
+  static class MockInputStream extends FSInputStream {
+    final MockFile file;
+    int offset = 0;
+
+    public MockInputStream(MockFile file) throws IOException {
+      this.file = file;
+    }
+
+    @Override
+    public void seek(long offset) throws IOException {
+      this.offset = (int) offset;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return offset;
+    }
+
+    @Override
+    public boolean seekToNewSource(long l) throws IOException {
+      return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (offset < file.length) {
+        return file.content[offset++] & 0xff;
+      }
+      return -1;
+    }
+  }
+
+  public static class MockPath extends Path {
+    private final FileSystem fs;
+    public MockPath(FileSystem fs, String path) {
+      super(path);
+      this.fs = fs;
+    }
+    @Override
+    public FileSystem getFileSystem(Configuration conf) {
+      return fs;
+    }
+  }
+
+  public static class MockOutputStream extends FSDataOutputStream {
+    public final MockFile file;
+
+    public MockOutputStream(MockFile file) throws IOException {
+      super(new DataOutputBuffer(), null);
+      this.file = file;
+    }
+
+    /**
+     * Set the blocks and their location for the file.
+     * Must be called after the stream is closed or the block length will be
+     * wrong.
+     * @param blocks the list of blocks
+     */
+    public void setBlocks(MockBlock... blocks) {
+      file.blocks = blocks;
+      int offset = 0;
+      int i = 0;
+      while (offset < file.length && i < blocks.length) {
+        blocks[i].offset = offset;
+        blocks[i].length = Math.min(file.length - offset, file.blockSize);
+        offset += blocks[i].length;
+        i += 1;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream();
+      file.length = buf.getLength();
+      file.content = new byte[file.length];
+      MockBlock block = new MockBlock("host1");
+      block.setLength(file.length);
+      setBlocks(block);
+      System.arraycopy(buf.getData(), 0, file.content, 0, file.length);
+    }
+
+    @Override
+    public String toString() {
+      return "Out stream to " + file.toString();
+    }
+  }
+
+  public void addFile(MockFile file) {
+    files.add(file);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql 
b/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
index 88a48f0..cb6e5f6 100644
--- a/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
+++ b/metastore/scripts/upgrade/derby/037-HIVE-14637.derby.sql
@@ -1,6 +1,6 @@
 ALTER TABLE "TBLS" ADD "MM_WATERMARK_WRITE_ID" BIGINT DEFAULT -1;
 ALTER TABLE "TBLS" ADD "MM_NEXT_WRITE_ID" BIGINT DEFAULT 0;
-CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT 
NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "LAST_HEARTBEAT" 
BIGINT);
+CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT 
NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "CREATED" BIGINT 
NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL);
 ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY 
("TW_ID");
 ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_FK1" FOREIGN KEY 
("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
 CREATE UNIQUE INDEX "APP"."UNIQUEWRITE" ON "APP"."TBL_WRITES" ("TBL_ID", 
"WRITE_ID");

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql 
b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
index f86ee4a..9da1703 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-2.2.0.derby.sql
@@ -112,7 +112,7 @@ ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT 
"CONSTRAINTS_PK" PRIMARY KEY
 
 CREATE INDEX "APP"."CONSTRAINTS_PARENT_TBL_ID_INDEX" ON 
"APP"."KEY_CONSTRAINTS"("PARENT_TBL_ID");
 
-CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT 
NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "LAST_HEARTBEAT" 
BIGINT);
+CREATE TABLE "APP"."TBL_WRITES" ("TW_ID" BIGINT NOT NULL, "TBL_ID" BIGINT NOT 
NULL, "WRITE_ID" BIGINT NOT NULL, "STATE" CHAR(1) NOT NULL, "CREATED" BIGINT 
NOT NULL, "LAST_HEARTBEAT" BIGINT NOT NULL);
 
 ALTER TABLE "APP"."TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY 
("TW_ID");
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql 
b/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
index 5d6f99f..9666d2b 100644
--- a/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/022-HIVE-14637.mssql.sql
@@ -7,7 +7,8 @@ CREATE TABLE TBL_WRITES
   TBL_ID BIGINT NOT NULL,
   WRITE_ID BIGINT NOT NULL,
   STATE CHAR(1) NOT NULL,
-  LAST_HEARTBEAT BIGINT
+  CREATED BIGINT NOT NULL,
+  LAST_HEARTBEAT BIGINT NOT NULL
 );
 ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID);
 ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_FK1 FOREIGN KEY (TBL_ID) 
REFERENCES TBLS (TBL_ID) ;

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql 
b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
index 26b2ab3..31016e2 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-2.2.0.mssql.sql
@@ -600,7 +600,8 @@ CREATE TABLE TBL_WRITES
   TBL_ID BIGINT NOT NULL,
   WRITE_ID BIGINT NOT NULL,
   STATE CHAR(1) NOT NULL,
-  LAST_HEARTBEAT BIGINT
+  CREATED BIGINT NOT NULL,
+  LAST_HEARTBEAT BIGINT NOT NULL
 );
 
 ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID);

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql 
b/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
index c024584..9e34db2 100644
--- a/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/037-HIVE-14637.mysql.sql
@@ -7,7 +7,8 @@ CREATE TABLE IF NOT EXISTS `TBL_WRITES`
   `TBL_ID` BIGINT NOT NULL,
   `WRITE_ID` BIGINT NOT NULL,
   `STATE` CHAR(1) NOT NULL,
-  `LAST_HEARTBEAT` BIGINT,
+  `CREATED` BIGINT NOT NULL,
+  `LAST_HEARTBEAT` BIGINT NOT NULL,
   PRIMARY KEY (`TW_ID`),
   UNIQUE KEY `UNIQUEWRITE` (`TBL_ID`,`WRITE_ID`),
   CONSTRAINT `TBL_WRITES_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` 
(`TBL_ID`)

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql 
b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
index b295950..3e73008 100644
--- a/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-schema-2.2.0.mysql.sql
@@ -835,7 +835,8 @@ CREATE TABLE IF NOT EXISTS `TBL_WRITES`
   `TBL_ID` BIGINT NOT NULL,
   `WRITE_ID` BIGINT NOT NULL,
   `STATE` CHAR(1) NOT NULL,
-  `LAST_HEARTBEAT` BIGINT,
+  `CREATED` BIGINT NOT NULL,
+  `LAST_HEARTBEAT` BIGINT NOT NULL,
   PRIMARY KEY (`TW_ID`),
   UNIQUE KEY `UNIQUEWRITE` (`TBL_ID`,`WRITE_ID`),
   CONSTRAINT `TBL_WRITES_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` 
(`TBL_ID`)

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql 
b/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
index 9f6dbb2..218eefe 100644
--- a/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/037-HIVE-14637.oracle.sql
@@ -7,6 +7,7 @@ CREATE TABLE TBL_WRITES
   TBL_ID NUMBER NOT NULL,
   WRITE_ID NUMBER NOT NULL,
   STATE CHAR(1) NOT NULL,
+  CREATED NUMBER NOT NULL,
   LAST_HEARTBEAT NUMBER NOT NULL
 );
 ALTER TABLE TBL_WRITES ADD CONSTRAINT TBL_WRITES_PK PRIMARY KEY (TW_ID);

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql 
b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
index 6972c20..5479712 100644
--- a/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-schema-2.2.0.oracle.sql
@@ -805,6 +805,7 @@ CREATE TABLE TBL_WRITES
   TBL_ID NUMBER NOT NULL,
   WRITE_ID NUMBER NOT NULL,
   STATE CHAR(1) NOT NULL,
+  CREATED NUMBER NOT NULL,
   LAST_HEARTBEAT NUMBER NOT NULL
 );
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql 
b/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
index f153837..310f51e 100644
--- a/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/036-HIVE-14637.postgres.sql
@@ -8,6 +8,7 @@ CREATE TABLE "TBL_WRITES"
   "TBL_ID" BIGINT NOT NULL,
   "WRITE_ID" BIGINT NOT NULL,
   "STATE" CHAR(1) NOT NULL,
+  "CREATED" BIGINT NOT NULL,
   "LAST_HEARTBEAT" BIGINT NOT NULL
 );
 ALTER TABLE ONLY "TBL_WRITES" ADD CONSTRAINT "TBL_WRITES_PK" PRIMARY KEY 
("TW_ID");

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql 
b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
index de997d3..bc865ed 100644
--- a/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-schema-2.2.0.postgres.sql
@@ -614,6 +614,7 @@ CREATE TABLE "TBL_WRITES"
   "TBL_ID" BIGINT NOT NULL,
   "WRITE_ID" BIGINT NOT NULL,
   "STATE" CHAR(1) NOT NULL,
+  "CREATED" BIGINT NOT NULL,
   "LAST_HEARTBEAT" BIGINT NOT NULL
 );
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index aa6d1eb..128e06a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6460,7 +6460,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           Thread.sleep(random.nextInt(deadlockRetryBackoffMs));
         }
 
-        // Do a separate txn after we have reserved the number. TODO: If we 
fail, ignore on read.
+        // Do a separate txn after we have reserved the number.
         boolean ok = false;
         ms.openTransaction();
         try {
@@ -6525,11 +6525,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       startFunction("heartbeat_write_id", " : db="
           + dbName + " tbl=" + tblName + " writeId=" + writeId);
       Exception ex = null;
+      boolean wasAborted = false;
       try {
         boolean ok = false;
         ms.openTransaction();
         try {
           MTableWrite tw = getActiveTableWrite(ms, dbName, tblName, writeId);
+          long absTimeout = HiveConf.getTimeVar(getConf(),
+              ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, 
TimeUnit.MILLISECONDS);
+          if (tw.getCreated() + absTimeout < System.currentTimeMillis()) {
+            tw.setState(String.valueOf(MM_WRITE_ABORTED));
+            wasAborted = true;
+          }
           tw.setLastHeartbeat(System.currentTimeMillis());
           ms.updateTableWrite(tw);
           ok = true;
@@ -6542,6 +6549,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       } finally {
         endFunction("heartbeat_write_id", ex == null, ex, tblName);
       }
+      if (wasAborted) throw new MetaException("The write was aborted due to 
absolute timeout");
       return new HeartbeatWriteIdResult();
     }
 
@@ -6576,10 +6584,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           long watermarkId = tbl.isSetMmWatermarkWriteId() ? 
tbl.getMmWatermarkWriteId() : -1;
           if (nextId > (watermarkId + 1)) {
             // There may be some intermediate failed or active writes; get the 
valid ones.
-            List<Long> ids = ms.getWriteIds(
+            List<Long> ids = ms.getTableWriteIds(
                 dbName, tblName, watermarkId, nextId, MM_WRITE_COMMITTED);
             // TODO: we could optimize here and send the smaller of the lists, 
and also use ranges
-            if (ids != null) {
+            if (!ids.isEmpty()) {
               Iterator<Long> iter = ids.iterator();
               long oldWatermarkId = watermarkId;
               while (iter.hasNext()) {
@@ -7057,6 +7065,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           startCompactorInitiator(conf);
           startCompactorWorkers(conf);
           startCompactorCleaner(conf);
+          startMmHousekeepingThread(conf);
           startHouseKeeperService(conf);
         } catch (Throwable e) {
           LOG.error("Failure when starting the compactor, compactions may not 
happen, " +
@@ -7096,6 +7105,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
   }
 
+  private static void startMmHousekeepingThread(HiveConf conf) throws 
Exception {
+    long intervalMs = HiveConf.getTimeVar(conf,
+        ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL, 
TimeUnit.MILLISECONDS);
+    if (intervalMs > 0) {
+      MetaStoreThread thread = new MmCleanerThread(intervalMs);
+      initializeAndStartThread(thread, conf);
+    }
+  }
+
+
   private static MetaStoreThread instantiateThread(String classname) throws 
Exception {
     Class<?> c = Class.forName(classname);
     Object o = c.newInstance();
@@ -7118,6 +7137,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     thread.init(new AtomicBoolean(), new AtomicBoolean());
     thread.start();
   }
+
   private static void startHouseKeeperService(HiveConf conf) throws Exception {
     if(!HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
       return;

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
index a0c8d3b..d4d94ff 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
@@ -51,6 +51,7 @@ public interface MetaStoreThread {
    *               thread should then assure that the loop has been gone 
completely through at
    *               least once.
    */
+  // TODO: move these test parameters to more specific places... there's no 
need to have them here
   void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 41385f7..c2ce259 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1884,4 +1884,14 @@ public class MetaStoreUtils {
     }
     csNew.setStatsObj(list);
   }
+
+  public static boolean isMmTable(Table table) {
+    return isMmTable(table.getParameters());
+  }
+
+  public static boolean isMmTable(Map<String, String> params) {
+    // TODO: perhaps it should be a 3rd value for 'transactional'?
+    String value = params.get(hive_metastoreConstants.TABLE_IS_MM);
+    return value != null && value.equalsIgnoreCase("true");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
new file mode 100644
index 0000000..6a7f588
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIds;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.RawStore.FullTableName;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+
+public class MmCleanerThread extends Thread implements MetaStoreThread {
+  private final static Logger LOG = 
LoggerFactory.getLogger(MmCleanerThread.class);
+  private HiveConf conf;
+  private int threadId;
+  private AtomicBoolean stop;
+  private long intervalMs;
+  private long heartbeatTimeoutMs, absTimeoutMs, abortedGraceMs;
+  /** Time override for tests. Only used for MM timestamp logic, not for the 
thread timing. */
+  private Supplier<Long> timeOverride = null;
+
+  public MmCleanerThread(long intervalMs) {
+    this.intervalMs = intervalMs;
+  }
+
+  @VisibleForTesting
+  void overrideTime(Supplier<Long> timeOverride) {
+    this.timeOverride = timeOverride;
+  }
+
+  private long getTimeMs() {
+    return timeOverride == null ? System.currentTimeMillis() : 
timeOverride.get();
+  }
+
+  @Override
+  public void setHiveConf(HiveConf conf) {
+    this.conf = conf;
+    heartbeatTimeoutMs = HiveConf.getTimeVar(
+        conf, ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT, 
TimeUnit.MILLISECONDS);
+    absTimeoutMs = HiveConf.getTimeVar(
+        conf, ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, 
TimeUnit.MILLISECONDS);
+    abortedGraceMs = HiveConf.getTimeVar(
+        conf, ConfVars.HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD, 
TimeUnit.MILLISECONDS);
+    if (heartbeatTimeoutMs > absTimeoutMs) {
+      throw new RuntimeException("Heartbeat timeout " + heartbeatTimeoutMs
+          + " cannot be larger than the absolute timeout " + absTimeoutMs);
+    }
+  }
+
+  @Override
+  public void setThreadId(int threadId) {
+    this.threadId = threadId;
+  }
+
+  @Override
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws 
MetaException {
+    this.stop = stop;
+    setPriority(MIN_PRIORITY);
+    setDaemon(true);
+  }
+
+  @Override
+  public void run() {
+    // Only get RS here, when we are already on the thread.
+    RawStore rs = getRs();
+    while (true) {
+      if (checkStop()) return;
+      long endTimeNs = System.nanoTime() + intervalMs * 1000000L;
+
+      runOneIteration(rs);
+
+      if (checkStop()) return;
+      long waitTimeMs = (endTimeNs - System.nanoTime()) / 1000000L;
+      if (waitTimeMs <= 0) continue;
+      try {
+        Thread.sleep(waitTimeMs);
+      } catch (InterruptedException e) {
+        LOG.error("Thread was interrupted and will now exit");
+        return;
+      }
+    }
+  }
+
+  private RawStore getRs() {
+    try {
+      return RawStoreProxy.getProxy(conf, conf,
+          conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId);
+    } catch (MetaException e) {
+      LOG.error("Failed to get RawStore; the thread will now die", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private boolean checkStop() {
+    if (!stop.get()) return false;
+    LOG.info("Stopping due to an external request");
+    return true;
+  }
+
+  @VisibleForTesting
+  void runOneIteration(RawStore rs) {
+    // We only get the names here; we want to get and process each table in a 
separate DB txn.
+    List<FullTableName> mmTables = null;
+    try {
+      mmTables = rs.getAllMmTablesForCleanup();
+    } catch (MetaException e) {
+      LOG.error("Failed to get tables", e);
+      return;
+    }
+    for (FullTableName tableName : mmTables) {
+      try {
+        processOneTable(tableName, rs);
+      } catch (MetaException e) {
+        LOG.error("Failed to process " + tableName, e);
+      }
+    }
+  }
+
+  private void processOneTable(FullTableName table, RawStore rs) throws 
MetaException {
+    // 1. Time out writes that have been running for a while.
+    //    a) Heartbeat timeouts (not enabled right now as heartbeat is not 
implemented).
+    //    b) Absolute timeouts.
+    //    c) Gaps that have the next ID and the derived absolute timeout. This 
is a small special
+    //       case that can happen if we increment next ID but fail to insert 
the write ID record,
+    //       which we do in separate txns to avoid making the conflict-prone 
increment txn longer.
+    LOG.info("Processing table " + table);
+    Table t = rs.getTable(table.dbName, table.tblName);
+    HashSet<Long> removeWriteIds = new HashSet<>(), cleanupOnlyWriteIds = new 
HashSet<>();
+    getWritesThatReadyForCleanUp(t, table, rs, removeWriteIds, 
cleanupOnlyWriteIds);
+
+    // 2. Delete the aborted writes' files from the FS.
+    deleteAbortedWriteIdFiles(table, rs, t, removeWriteIds);
+    deleteAbortedWriteIdFiles(table, rs, t, cleanupOnlyWriteIds);
+    // removeWriteIds-s now only contains the writes that were fully cleaned 
up after.
+
+    // 3. Advance the watermark.
+    advanceWatermark(table, rs, removeWriteIds);
+  }
+
+  private void getWritesThatReadyForCleanUp(Table t, FullTableName table, 
RawStore rs,
+      HashSet<Long> removeWriteIds, HashSet<Long> cleanupOnlyWriteIds) throws 
MetaException {
+    // We will generally ignore errors here. First, we expect some conflicts; 
second, we will get
+    // the final view of things after we do (or try, at any rate) all the 
updates.
+    long watermarkId = t.isSetMmWatermarkWriteId() ? t.getMmWatermarkWriteId() 
: -1,
+        nextWriteId = t.isSetMmNextWriteId() ? t.getMmNextWriteId() : 0;
+    long now = getTimeMs(), earliestOkHeartbeatMs = now - heartbeatTimeoutMs,
+        earliestOkCreateMs = now - absTimeoutMs, latestAbortedMs = now - 
abortedGraceMs;
+
+    List<MTableWrite> writes = rs.getTableWrites(
+        table.dbName, table.tblName, watermarkId, nextWriteId);
+    ListIterator<MTableWrite> iter = writes.listIterator(writes.size());
+    long expectedId = -1, nextCreated = -1;
+    // We will go in reverse order and add aborted writes for the gaps that 
have a following
+    // write ID that would imply that the previous one (created earlier) would 
have already
+    // expired, had it been open and not updated.
+    while (iter.hasPrevious()) {
+      MTableWrite write = iter.previous();
+      addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, 
write.getWriteId(),
+          nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, 
now);
+      expectedId = write.getWriteId() - 1;
+      nextCreated = write.getCreated();
+      char state = write.getState().charAt(0);
+      if (state == HiveMetaStore.MM_WRITE_ABORTED) {
+        if (write.getLastHeartbeat() < latestAbortedMs) {
+          removeWriteIds.add(write.getWriteId());
+        } else {
+          cleanupOnlyWriteIds.add(write.getWriteId());
+        }
+      } else if (state == HiveMetaStore.MM_WRITE_OPEN && write.getCreated() < 
earliestOkCreateMs) {
+        // TODO: also check for heartbeat here.
+        if (expireTimedOutWriteId(rs, table.dbName, table.tblName, 
write.getWriteId(),
+            now, earliestOkCreateMs, earliestOkHeartbeatMs, 
cleanupOnlyWriteIds)) {
+          cleanupOnlyWriteIds.add(write.getWriteId());
+        }
+      }
+    }
+    addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, watermarkId,
+        nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, 
now);
+  }
+
+  private void advanceWatermark(
+      FullTableName table, RawStore rs, HashSet<Long> cleanedUpWriteIds) {
+    if (!rs.openTransaction()) {
+      LOG.error("Cannot open transaction");
+      return;
+    }
+    boolean success = false;
+    try {
+      Table t = rs.getTable(table.dbName, table.tblName);
+      if (t == null) {
+        return;
+      }
+      long watermarkId = t.getMmWatermarkWriteId();
+      List<Long> writeIds = rs.getTableWriteIds(table.dbName, table.tblName, 
watermarkId,
+          t.getMmNextWriteId(), HiveMetaStore.MM_WRITE_COMMITTED);
+      long expectedId = watermarkId + 1;
+      boolean hasGap = false;
+      Iterator<Long> idIter = writeIds.iterator();
+      while (idIter.hasNext()) {
+        long next = idIter.next();
+        if (next < expectedId) continue;
+        while (next > expectedId) {
+          if (!cleanedUpWriteIds.contains(expectedId)) {
+            hasGap = true;
+            break;
+          }
+          ++expectedId;
+        }
+        if (hasGap) break;
+        ++expectedId;
+      }
+      // Make sure we also advance over the trailing aborted ones.
+      if (!hasGap) {
+        while (cleanedUpWriteIds.contains(expectedId)) {
+          ++expectedId;
+        }
+      }
+      long newWatermarkId = expectedId - 1;
+      if (newWatermarkId > watermarkId) {
+        t.setMmWatermarkWriteId(newWatermarkId);
+        rs.alterTable(table.dbName, table.tblName, t);
+        rs.deleteTableWrites(table.dbName, table.tblName, -1, expectedId);
+      }
+      success = true;
+    } catch (Exception ex) {
+      // TODO: should we try a couple times on conflicts? Aborted writes 
cannot be unaborted.
+      LOG.error("Failed to advance watermark", ex);
+      rs.rollbackTransaction();
+    }
+    if (success) {
+      tryCommit(rs);
+    }
+  }
+
+  private void deleteAbortedWriteIdFiles(
+      FullTableName table, RawStore rs, Table t, HashSet<Long> 
cleanUpWriteIds) {
+    if (cleanUpWriteIds.isEmpty()) return;
+    if (t.getPartitionKeysSize() > 0) {
+      for (String location : rs.getAllPartitionLocations(table.dbName, 
table.tblName)) {
+        deleteAbortedWriteIdFiles(location, cleanUpWriteIds);
+      }
+    } else {
+      deleteAbortedWriteIdFiles(t.getSd().getLocation(), cleanUpWriteIds);
+    }
+  }
+
+  private void deleteAbortedWriteIdFiles(String location, HashSet<Long> 
abortedWriteIds) {
+    LOG.info("Looking for " + abortedWriteIds.size() + " aborted write output 
in " + location);
+    Path path = new Path(location);
+    FileSystem fs;
+    FileStatus[] files;
+    try {
+      fs = path.getFileSystem(conf);
+      if (!fs.exists(path)) {
+        LOG.warn(path + " does not exist; assuming that the cleanup is not 
needed.");
+        return;
+      }
+      // TODO# do we need to account for any subdirectories here? decide after 
special-case jiras
+      files = fs.listStatus(path);
+    } catch (Exception ex) {
+      LOG.error("Failed to get files for " + path + "; cannot ensure cleanup 
for any writes");
+      abortedWriteIds.clear();
+      return;
+    }
+    for (FileStatus file : files) {
+      Path childPath = file.getPath();
+      if (!file.isDirectory()) {
+        LOG.warn("Skipping a non-directory file " + childPath);
+        continue;
+      }
+      Long writeId = ValidWriteIds.extractWriteId(childPath);
+      if (writeId == null) {
+        LOG.warn("Skipping an unknown directory " + childPath);
+        continue;
+      }
+      if (!abortedWriteIds.contains(writeId.longValue())) continue;
+      try {
+        if (!fs.delete(childPath, true)) throw new IOException("delete 
returned false");
+      } catch (Exception ex) {
+        LOG.error("Couldn't delete " + childPath + "; not cleaning up " + 
writeId, ex);
+        abortedWriteIds.remove(writeId.longValue());
+      }
+    }
+  }
+
+  private boolean expireTimedOutWriteId(RawStore rs, String dbName,
+      String tblName, long writeId, long now, long earliestOkCreatedMs,
+      long earliestOkHeartbeatMs, HashSet<Long> cleanupOnlyWriteIds) {
+    if (!rs.openTransaction()) {
+      return false;
+    }
+    try {
+      MTableWrite tw = rs.getTableWrite(dbName, tblName, writeId);
+      if (tw == null) {
+        // The write have been updated since the time when we thought it has 
expired.
+        tryCommit(rs);
+        return true;
+      }
+      char state = tw.getState().charAt(0);
+      if (state != HiveMetaStore.MM_WRITE_OPEN
+          || (tw.getCreated() > earliestOkCreatedMs
+              && tw.getLastHeartbeat() > earliestOkHeartbeatMs)) {
+        tryCommit(rs);
+        return true; // The write has been updated since the time when we 
thought it has expired.
+      }
+      tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_ABORTED));
+      tw.setLastHeartbeat(now);
+      rs.updateTableWrite(tw);
+    } catch (Exception ex) {
+      LOG.error("Failed to update an expired table write", ex);
+      rs.rollbackTransaction();
+      return false;
+    }
+    boolean result = tryCommit(rs);
+    if (result) {
+      cleanupOnlyWriteIds.add(writeId);
+    }
+    return result;
+  }
+
+  private boolean tryCommit(RawStore rs) {
+    try {
+      return rs.commitTransaction();
+    } catch (Exception ex) {
+      LOG.error("Failed to commit transaction", ex);
+      return false;
+    }
+  }
+
+  private boolean addTimedOutMissingWriteIds(RawStore rs, String dbName, 
String tblName,
+      long foundPrevId, long nextCreated, long expectedId, long 
earliestOkHeartbeatMs,
+      HashSet<Long> cleanupOnlyWriteIds, long now) throws MetaException {
+    // Assume all missing ones are created at the same time as the next 
present write ID.
+    // We also assume missing writes never had any heartbeats.
+    if (nextCreated >= earliestOkHeartbeatMs || expectedId < 0) return true;
+    Table t = null;
+    List<Long> localCleanupOnlyWriteIds = new ArrayList<>();
+    while (foundPrevId < expectedId) {
+      if (t == null && !rs.openTransaction()) {
+        LOG.error("Cannot open transaction; skipping");
+        return false;
+      }
+      try {
+        if (t == null) {
+          t = rs.getTable(dbName, tblName);
+        }
+        // We don't need to double check if the write exists; the unique index 
will cause an error.
+        rs.createTableWrite(t, expectedId, HiveMetaStore.MM_WRITE_ABORTED, 
now);
+      } catch (Exception ex) {
+        // TODO: don't log conflict exceptions?.. although we barely ever 
expect them.
+        LOG.error("Failed to create a missing table write", ex);
+        rs.rollbackTransaction();
+        return false;
+      }
+      localCleanupOnlyWriteIds.add(expectedId);
+      --expectedId;
+    }
+    boolean result = (t == null || tryCommit(rs));
+    if (result) {
+      cleanupOnlyWriteIds.addAll(localCleanupOnlyWriteIds);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index fb3b1ad..32e4daf 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -711,7 +711,7 @@ public class ObjectStore implements RawStore, Configurable {
   }
 
   public Database getDatabaseInternal(String name) throws MetaException, 
NoSuchObjectException {
-    return new GetDbHelper(name, null, true, true) {
+    return new GetDbHelper(name, true, true) {
       @Override
       protected Database getSqlResult(GetHelper<Database> ctx) throws 
MetaException {
         return directSql.getDatabase(dbName);
@@ -1183,14 +1183,7 @@ public class ObjectStore implements RawStore, 
Configurable {
       pm.retrieveAll(result);
       success = true;
     } finally {
-      if (success) {
-        commitTransaction();
-      } else {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      closeTransaction(success, query);
     }
     return result;
   }
@@ -2951,15 +2944,13 @@ public class ObjectStore implements RawStore, 
Configurable {
   public abstract class GetDbHelper extends GetHelper<Database> {
     /**
      * GetHelper for returning db info using directSql/JDO.
-     * Since this is a db-level call, tblName is ignored, and null is passed 
irrespective of what is passed in.
      * @param dbName The Database Name
-     * @param tblName Placeholder param to match signature, always ignored.
      * @param allowSql Whether or not we allow DirectSQL to perform this query.
      * @param allowJdo Whether or not we allow ORM to perform this query.
      * @throws MetaException
      */
     public GetDbHelper(
-        String dbName, String tblName, boolean allowSql, boolean allowJdo) 
throws MetaException {
+        String dbName,boolean allowSql, boolean allowJdo) throws MetaException 
{
       super(dbName,null,allowSql,allowJdo);
     }
 
@@ -8713,7 +8704,7 @@ public class ObjectStore implements RawStore, 
Configurable {
     openTransaction();
     try {
       MTable mtbl = getMTable(tbl.getDbName(), tbl.getTableName());
-      MTableWrite tw = new MTableWrite(mtbl, writeId, String.valueOf(state), 
heartbeat);
+      MTableWrite tw = new MTableWrite(mtbl, writeId, String.valueOf(state), 
heartbeat, heartbeat);
       pm.makePersistent(tw);
       success = true;
     } finally {
@@ -8746,8 +8737,8 @@ public class ObjectStore implements RawStore, 
Configurable {
       String dbName, String tblName, long writeId) throws MetaException {
     boolean success = false;
     Query query = null;
+    openTransaction();
     try {
-      openTransaction();
       query = pm.newQuery(MTableWrite.class,
               "table.tableName == t1 && table.database.name == t2 && writeId 
== t3");
       query.declareParameters("java.lang.String t1, java.lang.String t2, 
java.lang.Long t3");
@@ -8762,45 +8753,129 @@ public class ObjectStore implements RawStore, 
Configurable {
       }
       return writes.get(0);
     } finally {
-      if (success) {
-        commitTransaction();
-      } else {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
+      closeTransaction(success, query);
     }
   }
 
   @Override
-  public List<Long> getWriteIds(String dbName, String tblName,
+  public List<Long> getTableWriteIds(String dbName, String tblName,
       long watermarkId, long nextWriteId, char state) throws MetaException {
     boolean success = false;
     Query query = null;
+    openTransaction();
     try {
-      openTransaction();
+      boolean hasState = (state != '\0');
       query = pm.newQuery("select writeId from 
org.apache.hadoop.hive.metastore.model.MTableWrite"
-          + " where table.tableName == t1 && table.database.name == t2 && 
writeId >= t3"
-          + " && writeId < t4 && state == t5");
+          + " where table.tableName == t1 && table.database.name == t2 && 
writeId > t3"
+          + " && writeId < t4" + (hasState ? " && state == t5" : ""));
       query.declareParameters("java.lang.String t1, java.lang.String t2, 
java.lang.Long t3, "
-          + "java.lang.Long t4, java.lang.String t5");
+          + "java.lang.Long t4" + (hasState ? ", java.lang.String t5" : ""));
       query.setResult("writeId");
       query.setOrdering("writeId asc");
       @SuppressWarnings("unchecked")
-      List<Long> writes = (List<Long>) query.executeWithArray(
-          tblName, dbName, watermarkId, nextWriteId, String.valueOf(state));
+      List<Long> writes = (List<Long>) (hasState
+          ? query.executeWithArray(tblName, dbName, watermarkId, nextWriteId, 
String.valueOf(state))
+          : query.executeWithArray(tblName, dbName, watermarkId, nextWriteId));
+      success = true;
+      return (writes == null) ? new ArrayList<Long>() : new 
ArrayList<>(writes);
+    } finally {
+      closeTransaction(success, query);
+    }
+  }
+
+  @Override
+  public List<MTableWrite> getTableWrites(
+      String dbName, String tblName, long from, long to) throws MetaException {
+    boolean success = false;
+    Query query = null;
+    openTransaction();
+    try {
+      query = pm.newQuery(MTableWrite.class,
+          "table.tableName == t1 && table.database.name == t2 && writeId > t3 
&& writeId < t4");
+      query.declareParameters(
+          "java.lang.String t1, java.lang.String t2, java.lang.Long t3, 
java.lang.Long t4");
+      query.setOrdering("writeId asc");
+      @SuppressWarnings("unchecked")
+      List<MTableWrite> writes =
+        (List<MTableWrite>) query.executeWithArray(tblName, dbName, from, to);
       success = true;
       return (writes == null || writes.isEmpty()) ? null : new 
ArrayList<>(writes);
     } finally {
-      if (success) {
-        commitTransaction();
-      } else {
-        rollbackTransaction();
-      }
-      if (query != null) {
-        query.closeAll();
+      closeTransaction(success, query);
+    }
+  }
+
+
+  @Override
+  public void deleteTableWrites(
+      String dbName, String tblName, long from, long to) throws MetaException {
+    boolean success = false;
+    Query query = null;
+    openTransaction();
+    try {
+      query = pm.newQuery(MTableWrite.class,
+          "table.tableName == t1 && table.database.name == t2 && writeId > t3 
&& writeId < t4");
+      query.declareParameters(
+          "java.lang.String t1, java.lang.String t2, java.lang.Long t3, 
java.lang.Long t4");
+      query.deletePersistentAll(tblName, dbName, from, to);
+      success = true;
+    } finally {
+      closeTransaction(success, query);
+    }
+  }
+
+  @Override
+  public List<FullTableName > getAllMmTablesForCleanup() throws MetaException {
+    boolean success = false;
+    Query query = null;
+    openTransaction();
+    try {
+      // If the table had no MM writes, there's nothing to clean up
+      query = pm.newQuery(MTable.class, "mmNextWriteId > 0");
+      @SuppressWarnings("unchecked")
+      List<MTable> tables = (List<MTable>) query.execute();
+      pm.retrieveAll(tables);
+      ArrayList<FullTableName> result = new ArrayList<>(tables.size());
+      for (MTable table : tables) {
+        if (MetaStoreUtils.isMmTable(table.getParameters())) {
+          result.add(new FullTableName(table.getDatabase().getName(), 
table.getTableName()));
+        }
       }
+      success = true;
+      return result;
+    } finally {
+      closeTransaction(success, query);
+    }
+  }
+
+  @Override
+  public Collection<String> getAllPartitionLocations(String dbName, String 
tblName) {
+    boolean success = false;
+    Query query = null;
+    openTransaction();
+    try {
+      String q = "select sd.location from 
org.apache.hadoop.hive.metastore.model.MPartition"
+          + " where table.tableName == t1 && table.database.name == t2";
+      query = pm.newQuery();
+      query.declareParameters("java.lang.String t1, java.lang.String t2");
+      @SuppressWarnings("unchecked")
+      List<String> tables = (List<String>) query.execute();
+      pm.retrieveAll(tables);
+      success = true;
+      return new ArrayList<>(tables);
+    } finally {
+      closeTransaction(success, query);
+    }
+  }
+
+  private void closeTransaction(boolean success, Query query) {
+    if (success) {
+      commitTransaction();
+    } else {
+      rollbackTransaction();
+    }
+    if (query != null) {
+      query.closeAll();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index 170c07d..76ead25 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -23,6 +23,7 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -697,5 +698,28 @@ public interface RawStore extends Configurable {
 
   void createTableWrite(Table tbl, long writeId, char state, long heartbeat);
 
-  List<Long> getWriteIds(String dbName, String tblName, long watermarkId, long 
nextWriteId, char state) throws MetaException;
+  List<Long> getTableWriteIds(String dbName, String tblName, long watermarkId, 
long nextWriteId, char state) throws MetaException;
+
+
+  public static final class FullTableName {
+    public final String dbName, tblName;
+
+    public FullTableName(String dbName, String tblName) {
+      this.dbName = dbName;
+      this.tblName = tblName;
+    }
+
+    @Override
+    public String toString() {
+      return dbName + "." + tblName;
+    }
+  }
+
+  List<FullTableName> getAllMmTablesForCleanup() throws MetaException;
+
+  public List<MTableWrite> getTableWrites(String dbName, String tblName, long 
from, long to) throws MetaException;
+
+  Collection<String> getAllPartitionLocations(String dbName, String tblName);
+
+  void deleteTableWrites(String dbName, String tblName, long from, long to) 
throws MetaException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 829f0ae..ddc5a62 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -2759,9 +2759,36 @@ public class HBaseStore implements RawStore {
  
 
   @Override
-  public List<Long> getWriteIds(
+  public List<Long> getTableWriteIds(
       String dbName, String tblName, long watermarkId, long nextWriteId, char 
state) {
     // TODO: Auto-generated method stub
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public List<FullTableName> getAllMmTablesForCleanup() throws MetaException {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<MTableWrite> getTableWrites(String dbName, String tblName,
+      long from, long to) throws MetaException {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<String> getAllPartitionLocations(String dbName,
+      String tblName) {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteTableWrites(String dbName, String tblName, long from,
+      long to) throws MetaException {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java 
b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
index a7e5f3e..b7f398a 100644
--- 
a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
+++ 
b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
@@ -23,14 +23,16 @@ public class MTableWrite {
   private long writeId;
   private String state;
   private long lastHeartbeat;
+  private long created;
 
   public MTableWrite() {}
 
-  public MTableWrite(MTable table, long writeId, String state, long 
lastHeartbeat) {
+  public MTableWrite(MTable table, long writeId, String state, long 
lastHeartbeat, long created) {
     this.table = table;
     this.writeId = writeId;
     this.state = state;
     this.lastHeartbeat = lastHeartbeat;
+    this.created = created;
   }
 
   public MTable getTable() {
@@ -49,6 +51,10 @@ public class MTableWrite {
     return lastHeartbeat;
   }
 
+  public long getCreated() {
+    return created;
+  }
+
   public void setTable(MTable table) {
     this.table = table;
   }
@@ -64,4 +70,8 @@ public class MTableWrite {
   public void setLastHeartbeat(long lastHeartbeat) {
     this.lastHeartbeat = lastHeartbeat;
   }
+
+  public void setCreated(long created) {
+    this.created = created;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index bd71056..ce101dd 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -1082,6 +1082,9 @@
       <field name="state">
         <column name="STATE" length="1" jdbc-type="CHAR" allows-null="false"/>
       </field>
+      <field name="created">
+        <column name="CREATED" jdbc-type="BIGINT" allows-null="false"/>
+      </field>
       <field name="lastHeartbeat">
         <column name="LAST_HEARTBEAT" jdbc-type="BIGINT" allows-null="false"/>
       </field>

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 98c543f..acbbf4e 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -886,8 +887,30 @@ public class DummyRawStoreControlledCommit implements 
RawStore, Configurable {
   }
 
   @Override
-  public List<Long> getWriteIds(
+  public List<Long> getTableWriteIds(
       String dbName, String tblName, long watermarkId, long nextWriteId, char 
state) {
     return null;
   }
+
+  @Override
+  public List<FullTableName> getAllMmTablesForCleanup() throws MetaException {
+    return null;
+  }
+
+  @Override
+  public List<MTableWrite> getTableWrites(String dbName, String tblName,
+      long from, long to) throws MetaException {
+    return null;
+  }
+
+  @Override
+  public Collection<String> getAllPartitionLocations(String dbName,
+      String tblName) {
+    return null;
+  }
+
+  @Override
+  public void deleteTableWrites(String dbName, String tblName, long from,
+      long to) throws MetaException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 8e54b16..787c1f0 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -898,10 +899,32 @@ public class DummyRawStoreForJdoConnection implements 
RawStore {
   }
 
   @Override
-  public List<Long> getWriteIds(
+  public List<Long> getTableWriteIds(
       String dbName, String tblName, long watermarkId, long nextWriteId, char 
state) {
     return null;
   }
+
+  @Override
+  public List<FullTableName> getAllMmTablesForCleanup() throws MetaException {
+    return null;
+  }
+
+  @Override
+  public List<MTableWrite> getTableWrites(String dbName, String tblName,
+      long from, long to) throws MetaException {
+    return null;
+  }
+
+  @Override
+  public Collection<String> getAllPartitionLocations(String dbName,
+      String tblName) {
+    return null;
+  }
+
+  @Override
+  public void deleteTableWrites(String dbName, String tblName, long from,
+      long to) throws MetaException {
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70299dc4/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java 
b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 0497159..a8d3495 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -17,16 +17,20 @@
  */
 package org.apache.hadoop.hive.metastore;
 
+import static org.junit.Assert.*;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
 import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
 import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -42,9 +46,13 @@ import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hive.common.util.MockFileSystem;
+import org.apache.hive.common.util.MockFileSystem.MockFile;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -53,6 +61,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Supplier;
+
 public class TestObjectStore {
   private ObjectStore objectStore = null;
 
@@ -67,6 +77,15 @@ public class TestObjectStore {
   private static final String ROLE2 = "testobjectstorerole2";
   private static final Logger LOG = 
LoggerFactory.getLogger(TestObjectStore.class.getName());
 
+  private static final class LongSupplier implements Supplier<Long> {
+    public long value = 0;
+
+    @Override
+    public Long get() {
+      return value;
+    }
+  }
+
   public static class MockPartitionExpressionProxy implements 
PartitionExpressionProxy {
     @Override
     public String convertExprToFilter(byte[] expr) throws MetaException {
@@ -142,7 +161,7 @@ public class TestObjectStore {
   public void testTableOps() throws MetaException, InvalidObjectException, 
NoSuchObjectException, InvalidInputException {
     Database db1 = new Database(DB1, "description", "locationurl", null);
     objectStore.createDatabase(db1);
-    StorageDescriptor sd = new StorageDescriptor(null, "location", null, null, 
false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, 
null);
+    StorageDescriptor sd = createFakeSd("location");
     HashMap<String,String> params = new HashMap<String,String>();
     params.put("EXTERNAL", "false");
     Table tbl1 = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd, null, params, 
"viewOriginalText", "viewExpandedText", "MANAGED_TABLE");
@@ -164,6 +183,156 @@ public class TestObjectStore {
 
     objectStore.dropDatabase(DB1);
   }
+  
+
+  /**
+   * Test table operations
+   */
+  @Test
+  public void testMmCleaner() throws Exception {
+    HiveConf conf = new HiveConf();
+    conf.set(ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT.varname, "3ms");
+    conf.set(ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT.varname, "20ms");
+    conf.set(ConfVars.HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD.varname, "5ms");
+    conf.set("fs.mock.impl", MockFileSystem.class.getName());
+
+    MockFileSystem mfs = (MockFileSystem)(new 
Path("mock:///").getFileSystem(conf));
+    mfs.clear();
+    mfs.allowDelete = true;
+    // Don't add the files just yet...
+    MockFile[] files = new MockFile[9];
+    for (int i = 0; i < files.length; ++i) {
+      files[i] = new MockFile("mock:/foo/mm_" + i + "/1", 0, new byte[0]);
+    }
+
+    LongSupplier time = new LongSupplier();
+
+    MmCleanerThread mct = new MmCleanerThread(0);
+    mct.setHiveConf(conf);
+    mct.overrideTime(time);
+
+    Database db1 = new Database(DB1, "description", "locationurl", null);
+    objectStore.createDatabase(db1);
+    StorageDescriptor sd = createFakeSd("mock:/foo");
+    HashMap<String,String> params = new HashMap<String,String>();
+    params.put("EXTERNAL", "false");
+    params.put(hive_metastoreConstants.TABLE_IS_MM, "true");
+    Table tbl = new Table(TABLE1, DB1, "owner", 1, 2, 3, sd,
+        null, params, null, null, "MANAGED_TABLE");
+    objectStore.createTable(tbl);
+
+    // Add write #0 so the watermark wouldn't advance; skip write #1, add #2 
at 0, skip #3
+    createCompleteTableWrite(mfs, files, 0, time, tbl, 
HiveMetaStore.MM_WRITE_OPEN);
+    mfs.addFile(files[1]);
+    createCompleteTableWrite(mfs, files, 2, time, tbl, 
HiveMetaStore.MM_WRITE_OPEN);
+    mfs.addFile(files[3]);
+    tbl.setMmNextWriteId(4);
+    objectStore.alterTable(DB1, TABLE1, tbl);
+
+    mct.runOneIteration(objectStore);
+    List<Long> writes = getAbortedWrites();
+    assertEquals(0, writes.size()); // Missing write is not aborted before 
timeout.
+    time.value = 4; // Advance time.
+    mct.runOneIteration(objectStore);
+    writes = getAbortedWrites();
+    assertEquals(1, writes.size()); // Missing write is aborted after timeout.
+    assertEquals(1L, writes.get(0).longValue());
+    checkDeletedSet(files, 1);
+    // However, write #3 was not aborted as we cannot determine when it will 
time out.
+    createCompleteTableWrite(mfs, files, 4, time, tbl, 
HiveMetaStore.MM_WRITE_OPEN);
+    time.value = 8;
+    // It will now be aborted, since we have a following write.
+    mct.runOneIteration(objectStore);
+    writes = getAbortedWrites();
+    assertEquals(2, writes.size());
+    assertTrue(writes.contains(Long.valueOf(3)));
+    checkDeletedSet(files, 1, 3);
+
+    // Commit #0 and #2 and confirm that the watermark advances.
+    // It will only advance over #1, since #3 was aborted at 8 and grace 
period has not passed.
+    time.value = 10;
+    MTableWrite tw = objectStore.getTableWrite(DB1, TABLE1, 0);
+    tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_COMMITTED));
+    objectStore.updateTableWrite(tw);
+    tw = objectStore.getTableWrite(DB1, TABLE1, 2);
+    tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_COMMITTED));
+    objectStore.updateTableWrite(tw);
+    mct.runOneIteration(objectStore);
+    writes = getAbortedWrites();
+    assertEquals(1, writes.size());
+    assertEquals(3L, writes.get(0).longValue());
+    tbl = objectStore.getTable(DB1, TABLE1);
+    assertEquals(2L, tbl.getMmWatermarkWriteId());
+
+    // Now advance the time and see that watermark also advances over #3.
+    time.value = 16;
+    mct.runOneIteration(objectStore);
+    writes = getAbortedWrites();
+    assertEquals(0, writes.size());
+    tbl = objectStore.getTable(DB1, TABLE1);
+    assertEquals(3L, tbl.getMmWatermarkWriteId());
+
+    // Check that the open write gets aborted after some time; then the 
watermark advances.
+    time.value = 25;
+    mct.runOneIteration(objectStore);
+    writes = getAbortedWrites();
+    assertEquals(1, writes.size());
+    assertEquals(4L, writes.get(0).longValue());
+    time.value = 31;
+    mct.runOneIteration(objectStore);
+    tbl = objectStore.getTable(DB1, TABLE1);
+    assertEquals(4L, tbl.getMmWatermarkWriteId());
+    checkDeletedSet(files, 1, 3, 4); // The other two should still be deleted.
+
+    // Finally check that we cannot advance watermark if cleanup fails for 
some file.
+    createCompleteTableWrite(mfs, files, 5, time, tbl, 
HiveMetaStore.MM_WRITE_ABORTED);
+    createCompleteTableWrite(mfs, files, 6, time, tbl, 
HiveMetaStore.MM_WRITE_ABORTED);
+    createCompleteTableWrite(mfs, files, 7, time, tbl, 
HiveMetaStore.MM_WRITE_COMMITTED);
+    createCompleteTableWrite(mfs, files, 8, time, tbl, 
HiveMetaStore.MM_WRITE_ABORTED);
+    time.value = 37; // Skip the grace period.
+    files[6].cannotDelete = true;
+    mct.runOneIteration(objectStore);
+    checkDeletedSet(files, 1, 3, 4, 5, 8); // The other two should still be 
deleted.
+    tbl = objectStore.getTable(DB1, TABLE1);
+    assertEquals(5L, tbl.getMmWatermarkWriteId()); // Watermark only goes up 
to 5.
+    files[6].cannotDelete = false;
+    mct.runOneIteration(objectStore);
+    checkDeletedSet(files, 1, 3, 4, 5, 6, 8);
+    tbl = objectStore.getTable(DB1, TABLE1);
+    assertEquals(8L, tbl.getMmWatermarkWriteId()); // Now it advances all the 
way.
+
+    objectStore.dropTable(DB1, TABLE1);
+    objectStore.dropDatabase(DB1);
+  }
+
+  private void createCompleteTableWrite(MockFileSystem mfs, MockFile[] files,
+      int id, LongSupplier time, Table tbl, char state) throws MetaException, 
InvalidObjectException {
+    objectStore.createTableWrite(tbl, id, state, time.value);
+    mfs.addFile(files[id]);
+    tbl.setMmNextWriteId(id + 1);
+    objectStore.alterTable(DB1, TABLE1, tbl);
+  }
+
+  private void checkDeletedSet(MockFile[] files, int... deleted) {
+    for (int id : deleted) {
+      assertTrue("File " + id + " not deleted", files[id].isDeleted);
+    }
+    int count = 0;
+    for (MockFile file : files) {
+      if (file.isDeleted) ++count;
+    }
+    assertEquals(deleted.length, count); // Make sure nothing else is deleted.
+  }
+
+  private List<Long> getAbortedWrites() throws MetaException {
+    return objectStore.getTableWriteIds(DB1, TABLE1, -1, 10, 
HiveMetaStore.MM_WRITE_ABORTED);
+  }
+
+  private StorageDescriptor createFakeSd(String location) {
+    return new StorageDescriptor(null, location, null, null, false, 0,
+        new SerDeInfo("SerDeName", "serializationLib", null), null, null, 
null);
+  }
+
 
   /**
    * Tests partition operations
@@ -172,7 +341,7 @@ public class TestObjectStore {
   public void testPartitionOps() throws MetaException, InvalidObjectException, 
NoSuchObjectException, InvalidInputException {
     Database db1 = new Database(DB1, "description", "locationurl", null);
     objectStore.createDatabase(db1);
-    StorageDescriptor sd = new StorageDescriptor(null, "location", null, null, 
false, 0, new SerDeInfo("SerDeName", "serializationLib", null), null, null, 
null);
+    StorageDescriptor sd = createFakeSd("location");
     HashMap<String,String> tableParams = new HashMap<String,String>();
     tableParams.put("EXTERNAL", "false");
     FieldSchema partitionKey1 = new FieldSchema("Country", 
serdeConstants.STRING_TYPE_NAME, "");
@@ -265,7 +434,7 @@ public class TestObjectStore {
     MetricsFactory.init(conf);
     CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance();
 
-    objectStore.new GetDbHelper("foo", null, true, true) {
+    objectStore.new GetDbHelper("foo", true, true) {
       @Override
       protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) 
throws MetaException {
         return null;
@@ -282,7 +451,7 @@ public class TestObjectStore {
     MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER,
         MetricsConstant.DIRECTSQL_ERRORS, "");
 
-    objectStore.new GetDbHelper("foo", null, true, true) {
+    objectStore.new GetDbHelper("foo", true, true) {
       @Override
       protected Database getSqlResult(ObjectStore.GetHelper<Database> ctx) 
throws MetaException {
         throw new RuntimeException();

Reply via email to