HIVE-20936 Allow the Worker thread in the metastore to run outside of it (Jaume 
Marhuenda via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/926c1e8e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/926c1e8e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/926c1e8e

Branch: refs/heads/master
Commit: 926c1e8e352582a2bb89d458fb45c89c698c56fc
Parents: 49f8f84
Author: Jaume Marhuenda <jmarhue...@hortonworks.com>
Authored: Fri Dec 21 20:32:58 2018 -0800
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Fri Dec 21 20:33:35 2018 -0800

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/TestStreaming.java  |     3 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |     2 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |     4 +-
 .../hive/ql/txn/compactor/CompactorMR.java      |    18 +-
 .../hive/ql/txn/compactor/CompactorThread.java  |    71 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |     4 +-
 .../txn/compactor/MetaStoreCompactorThread.java |    86 +
 .../ql/txn/compactor/RemoteCompactorThread.java |    65 +
 .../hadoop/hive/ql/txn/compactor/Worker.java    |    90 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |     6 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |     9 +-
 .../apache/hive/service/server/HiveServer2.java |    18 +
 .../metastore/api/AlterPartitionsRequest.java   |    36 +-
 .../metastore/api/ClearFileMetadataRequest.java |    32 +-
 .../hive/metastore/api/ClientCapabilities.java  |    32 +-
 .../metastore/api/CompactionInfoStruct.java     |  1533 +++
 .../metastore/api/FindSchemasByColsResp.java    |    36 +-
 .../hive/metastore/api/FireEventRequest.java    |    32 +-
 .../metastore/api/GetAllFunctionsResponse.java  |    36 +-
 .../api/GetFileMetadataByExprRequest.java       |    32 +-
 .../api/GetFileMetadataByExprResult.java        |    48 +-
 .../metastore/api/GetFileMetadataRequest.java   |    32 +-
 .../metastore/api/GetFileMetadataResult.java    |    44 +-
 .../metastore/api/GetPartitionsFilterSpec.java  |    32 +-
 .../api/GetPartitionsProjectionSpec.java        |    32 +-
 .../metastore/api/GetPartitionsRequest.java     |    32 +-
 .../metastore/api/GetPartitionsResponse.java    |    36 +-
 .../hive/metastore/api/GetTablesRequest.java    |    32 +-
 .../hive/metastore/api/GetTablesResult.java     |    36 +-
 .../metastore/api/InsertEventRequestData.java   |    96 +-
 .../metastore/api/NotificationEventRequest.java |    32 +-
 .../api/NotificationEventResponse.java          |    36 +-
 .../api/OptionalCompactionInfoStruct.java       |   395 +
 .../metastore/api/PutFileMetadataRequest.java   |    64 +-
 .../metastore/api/RenamePartitionRequest.java   |    32 +-
 .../hive/metastore/api/SchemaVersion.java       |    36 +-
 .../hive/metastore/api/ThriftHiveMetastore.java | 11977 +++++++++++++----
 .../hive/metastore/api/WMFullResourcePlan.java  |   144 +-
 .../api/WMGetAllResourcePlanResponse.java       |    36 +-
 .../WMGetTriggersForResourePlanResponse.java    |    36 +-
 .../api/WMValidateResourcePlanResponse.java     |    64 +-
 .../api/WriteNotificationLogRequest.java        |    32 +-
 .../gen-php/metastore/ThriftHiveMetastore.php   |  2947 +++-
 .../src/gen/thrift/gen-php/metastore/Types.php  |  1064 +-
 .../hive_metastore/ThriftHiveMetastore-remote   |    49 +
 .../hive_metastore/ThriftHiveMetastore.py       |  3643 +++--
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |   726 +-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |    61 +
 .../gen/thrift/gen-rb/thrift_hive_metastore.rb  |   397 +
 .../hive/metastore/HiveMetaStoreClient.java     |    35 +
 .../hadoop/hive/metastore/IMetaStoreClient.java |    59 +
 .../hive/metastore/conf/MetastoreConf.java      |     4 +
 .../src/main/thrift/hive_metastore.thrift       |    26 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |    44 +-
 .../hadoop/hive/metastore/MetaStoreThread.java  |     3 +-
 .../hive/metastore/txn/CompactionInfo.java      |    73 +-
 .../metastore/utils/MetaStoreServerUtils.java   |     2 -
 .../HiveMetaStoreClientPreCatalog.java          |    35 +
 .../apache/hive/streaming/TestStreaming.java    |     3 +-
 59 files changed, 18604 insertions(+), 6016 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index b290a40..5935220 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
@@ -424,7 +423,7 @@ public class TestStreaming {
   /**
    * this is a clone from TestTxnStatement2....
    */
-  public static void runWorker(HiveConf hiveConf) throws MetaException {
+  public static void runWorker(HiveConf hiveConf) throws Exception {
     AtomicBoolean stop = new AtomicBoolean(true);
     Worker t = new Worker();
     t.setThreadId((int) t.getId());

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index d3800cd..2b22a62 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1074,7 +1074,7 @@ public class TestCompactor {
   }
 
   private void runMajorCompaction(
-      String dbName, String tblName, String... partNames) throws MetaException 
{
+      String dbName, String tblName, String... partNames) throws Exception {
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     Worker t = new Worker();
     t.setThreadId((int) t.getId());

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 18253c9..06b0209 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -59,7 +59,7 @@ import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
 /**
  * A class to clean directories after compactions.  This will run in a 
separate thread.
  */
-public class Cleaner extends CompactorThread {
+public class Cleaner extends MetaStoreCompactorThread {
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private long cleanerCheckInterval = 0;
@@ -67,7 +67,7 @@ public class Cleaner extends CompactorThread {
   private ReplChangeManager replChangeManager;
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws 
MetaException {
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
     super.init(stop, looped);
     replChangeManager = ReplChangeManager.getInstance(conf);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 42ce174..7d5ee4a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.DriverUtils;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -96,6 +96,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.Ref;
 import org.apache.parquet.Strings;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,7 +227,7 @@ public class CompactorMR {
    * @throws java.io.IOException if the job fails
    */
   void run(HiveConf conf, String jobName, Table t, Partition p, 
StorageDescriptor sd, ValidWriteIdList writeIds,
-           CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) 
throws IOException {
+           CompactionInfo ci, Worker.StatsUpdater su, IMetaStoreClient msc) 
throws IOException {
 
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && 
conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new 
RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
@@ -268,7 +269,7 @@ public class CompactorMR {
         launchCompactionJob(jobMinorCompact,
           null, CompactionType.MINOR, null,
           parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * 
maxDeltastoHandle),
-          maxDeltastoHandle, -1, conf, txnHandler, ci.id, jobName);
+          maxDeltastoHandle, -1, conf, msc, ci.id, jobName);
       }
       //now recompute state since we've done minor compactions and have 
different 'best' set of deltas
       dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds);
@@ -312,7 +313,7 @@ public class CompactorMR {
     }
 
     launchCompactionJob(job, baseDir, ci.type, dirsToSearch, 
dir.getCurrentDirectories(),
-      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, 
txnHandler, ci.id, jobName);
+      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, 
ci.id, jobName);
 
     su.gatherStats();
   }
@@ -539,7 +540,7 @@ public class CompactorMR {
                                    StringableList dirsToSearch,
                                    List<AcidUtils.ParsedDelta> parsedDeltas,
                                    int curDirNumber, int obsoleteDirNumber, 
HiveConf hiveConf,
-                                   TxnStore txnHandler, long id, String 
jobName) throws IOException {
+                                   IMetaStoreClient msc, long id, String 
jobName) throws IOException {
     job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
     if(dirsToSearch == null) {
       dirsToSearch = new StringableList();
@@ -579,7 +580,12 @@ public class CompactorMR {
       RunningJob rj = jc.submitJob(job);
       LOG.info("Submitted compaction job '" + job.getJobName() +
           "' with jobID=" + rj.getID() + " compaction ID=" + id);
-      txnHandler.setHadoopJobId(rj.getID().toString(), id);
+      try {
+        msc.setHadoopJobid(rj.getID().toString(), id);
+      } catch (TException e) {
+        LOG.warn("Error setting hadoop job, jobId=" + rj.getID().toString()
+            + " compactionId=" + id, e);
+      }
       rj.waitForCompletion();
       if (!rj.isSuccessful()) {
         throw new IOException((compactionType == CompactionType.MAJOR ? 
"Major" : "Minor") +

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index f5b901d..f45140d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -17,23 +17,19 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.RawStore;
-import org.apache.hadoop.hive.metastore.RawStoreProxy;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -42,26 +38,29 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 
 /**
  * Superclass for all threads in the compactor.
  */
-public abstract class CompactorThread extends Thread implements 
MetaStoreThread {
+public abstract class CompactorThread extends Thread implements Configurable {
   static final private String CLASS_NAME = CompactorThread.class.getName();
-  static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  protected static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   protected HiveConf conf;
-  protected TxnStore txnHandler;
   protected RawStore rs;
-  protected int threadId;
   protected AtomicBoolean stop;
   protected AtomicBoolean looped;
 
+  protected int threadId;
+
+  public void setThreadId(int threadId) {
+    this.threadId = threadId;
+  }
+
   @Override
   public void setConf(Configuration configuration) {
     // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to 
call other methods with
@@ -77,24 +76,11 @@ public abstract class CompactorThread extends Thread 
implements MetaStoreThread
     return conf;
   }
 
-  @Override
-  public void setThreadId(int threadId) {
-    this.threadId = threadId;
-  }
-
-  @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws 
MetaException {
-    this.stop = stop;
-    this.looped = looped;
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
     setPriority(MIN_PRIORITY);
     setDaemon(true); // this means the process will exit without waiting for 
this thread
-
-    // Get our own instance of the transaction handler
-    txnHandler = TxnUtils.getTxnStore(conf);
-
-    // Get our own connection to the database so we can get table and 
partition information.
-    rs = RawStoreProxy.getProxy(conf, conf,
-        MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), 
threadId);
+    this.stop = stop;
+    this.looped = looped;
   }
 
   /**
@@ -103,14 +89,15 @@ public abstract class CompactorThread extends Thread 
implements MetaStoreThread
    * @return metastore table
    * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table 
cannot be found.
    */
-  protected Table resolveTable(CompactionInfo ci) throws MetaException {
-    try {
-      return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
-    } catch (MetaException e) {
-      LOG.error("Unable to find table " + ci.getFullTableName() + ", " + 
e.getMessage());
-      throw e;
-    }
-  }
+  abstract Table resolveTable(CompactionInfo ci) throws MetaException;
+
+  /**
+   * Get list of partitions by name.
+   * @param ci compaction info.
+   * @return list of partitions
+   * @throws MetaException if an error occurs.
+   */
+  abstract List<Partition> getPartitionsByNames(CompactionInfo ci) throws 
MetaException;
 
   /**
    * Get the partition being compacted.
@@ -123,8 +110,7 @@ public abstract class CompactorThread extends Thread 
implements MetaStoreThread
     if (ci.partName != null) {
       List<Partition> parts;
       try {
-        parts = rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, 
ci.tableName,
-            Collections.singletonList(ci.partName));
+        parts = getPartitionsByNames(ci);
         if (parts == null || parts.size() == 0) {
           // The partition got dropped before we went looking for it.
           return null;
@@ -220,4 +206,15 @@ public abstract class CompactorThread extends Thread 
implements MetaStoreThread
   protected String tableName(Table t) {
     return Warehouse.getQualifiedName(t);
   }
+
+  private static AtomicInteger nextThreadId = new AtomicInteger(1000000);
+
+  public static void initializeAndStartThread(CompactorThread thread,
+      Configuration conf) throws Exception {
+    LOG.info("Starting compactor thread of type " + 
thread.getClass().getName());
+    thread.setConf(conf);
+    thread.setThreadId(nextThreadId.incrementAndGet());
+    thread.init(new AtomicBoolean(), new AtomicBoolean());
+    thread.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index cdcc0e9..a0df82c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * A class to initiate compactions.  This will run in a separate thread.
  * It's critical that there exactly 1 of these in a given warehouse.
  */
-public class Initiator extends CompactorThread {
+public class Initiator extends MetaStoreCompactorThread {
   static final private String CLASS_NAME = Initiator.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
@@ -202,7 +202,7 @@ public class Initiator extends CompactorThread {
   }
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws 
MetaException {
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
     super.init(stop, looped);
     checkInterval =
         conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, 
TimeUnit.MILLISECONDS) ;

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
new file mode 100644
index 0000000..1ddc54d
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * Compactor threads that runs in the metastore. It uses a {@link TxnStore}
+ * to access the internal database.
+ */
+public class MetaStoreCompactorThread extends CompactorThread implements 
MetaStoreThread {
+
+  protected TxnStore txnHandler;
+  protected RawStore rs;
+  protected int threadId;
+
+  @Override
+  public void setThreadId(int threadId) {
+    this.threadId = threadId;
+  }
+
+  @Override
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
+    super.init(stop, looped);
+
+    // Get our own instance of the transaction handler
+    txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Get our own connection to the database so we can get table and 
partition information.
+    rs = RawStoreProxy.getProxy(conf, conf,
+        MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), 
threadId);
+  }
+
+  @Override Table resolveTable(CompactionInfo ci) throws MetaException {
+    try {
+      return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
+    } catch (MetaException e) {
+      LOG.error("Unable to find table " + ci.getFullTableName() + ", " + 
e.getMessage());
+      throw e;
+    }
+  }
+
+  @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws 
MetaException {
+    try {
+      return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, 
ci.tableName,
+          Collections.singletonList(ci.partName));
+    } catch (MetaException e) {
+      LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
+      throw e;
+    } catch (NoSuchObjectException e) {
+      LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
+      throw new MetaException(e.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
new file mode 100644
index 0000000..9678786
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * Compactor thread that can run outside the metastore. It will
+ * use the metastore thrift API which will default to a remote connection
+ * if the thread is running outside the metastore or to a function call
+ * if it's running within the metastore.
+ */
+public class RemoteCompactorThread extends CompactorThread {
+  protected IMetaStoreClient msc;
+
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
+    super.init(stop, looped);
+    this.msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+  }
+
+  @Override Table resolveTable(CompactionInfo ci) throws MetaException {
+    try {
+      return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
+    } catch (TException e) {
+      LOG.error("Unable to find table " + ci.getFullTableName() + ", " + 
e.getMessage());
+      throw new MetaException(e.toString());
+    }
+  }
+
+  @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws 
MetaException {
+    try {
+      return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, 
ci.tableName,
+          Collections.singletonList(ci.partName));
+    } catch (TException e) {
+      LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
+      throw new MetaException(e.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 49662cd..b645870 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -21,19 +21,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -62,10 +59,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * A class to do compactions.  This will run in a separate thread.  It will 
spin on the
  * compaction queue and look for new work to do.
  */
-public class Worker extends CompactorThread {
+public class Worker extends RemoteCompactorThread implements MetaStoreThread {
   static final private String CLASS_NAME = Worker.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-  static final private long SLEEP_TIME = 5000;
+  static final private long SLEEP_TIME = 10000;
   static final private int baseThreadNum = 10002;
 
   private String workerName;
@@ -88,13 +85,18 @@ public class Worker extends CompactorThread {
 // don't go  through Initiator for user initiated compactions)
   @Override
   public void run() {
+    LOG.info("Starting Worker thread");
     do {
       boolean launchedJob = false;
       // Make sure nothing escapes this run method and kills the metastore at 
large,
       // so wrap it in a big catch Throwable statement.
       CompactionHeartbeater heartbeater = null;
       try {
-        final CompactionInfo ci = txnHandler.findNextToCompact(workerName);
+        if (msc == null) {
+          msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+        }
+        final CompactionInfo ci = 
CompactionInfo.optionalCompactionInfoStructToInfo(
+            msc.findNextCompact(workerName));
         LOG.debug("Processing compaction request " + ci);
 
         if (ci == null && !stop.get()) {
@@ -114,11 +116,11 @@ public class Worker extends CompactorThread {
           if (t1 == null) {
             LOG.info("Unable to find table " + ci.getFullTableName() +
                 ", assuming it was dropped and moving on.");
-            txnHandler.markCleaned(ci);
+            msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
             continue;
           }
         } catch (MetaException e) {
-          txnHandler.markCleaned(ci);
+          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
           continue;
         }
         // This chicanery is to get around the fact that the table needs to be 
final in order to
@@ -132,11 +134,11 @@ public class Worker extends CompactorThread {
           if (p == null && ci.partName != null) {
             LOG.info("Unable to find partition " + ci.getFullPartitionName() +
                 ", assuming it was dropped and moving on.");
-            txnHandler.markCleaned(ci);
+            msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
             continue;
           }
         } catch (Exception e) {
-          txnHandler.markCleaned(ci);
+          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
           continue;
         }
 
@@ -146,45 +148,43 @@ public class Worker extends CompactorThread {
         // Check that the table or partition isn't sorted, as we don't yet 
support that.
         if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
           LOG.error("Attempt to compact sorted table, which is not yet 
supported!");
-          txnHandler.markCleaned(ci);
+          msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
           continue;
         }
         String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
         if (ci.runAs == null) {
           ci.runAs = findUserToRunAs(sd.getLocation(), t);
         }
-        OpenTxnRequest otReq = new OpenTxnRequest(1, ci.runAs, hostname());
-        otReq.setAgentInfo(getName());//ThreadName
-        long compactorTxnId = txnHandler.openTxns(otReq).getTxn_ids().get(0);
+        long compactorTxnId = msc.openTxns(ci.runAs, 1).getTxn_ids().get(0);
 
-        heartbeater = new CompactionHeartbeater(txnHandler, compactorTxnId, 
fullTableName, conf);
+        heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, 
conf);
         heartbeater.start();
 
-        ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId);
-        GetValidWriteIdsRequest rqst = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+        ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId);
         //with this ValidWriteIdList is capped at whatever HWM validTxnList has
-        rqst.setValidTxnList(validTxnList.writeToString());
         final ValidCompactorWriteIdList tblValidWriteIds =
-                
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
+                TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
+                    Collections.singletonList(fullTableName), 
validTxnList.writeToString()).get(0));
         LOG.debug("ValidCompactWriteIdList: " + 
tblValidWriteIds.writeToString());
         conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
 
         ci.highestWriteId = tblValidWriteIds.getHighWatermark();
         //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, 
we keep metadata about
         //it until after any data written by it are physically removed
-        txnHandler.updateCompactorState(ci, compactorTxnId);
+        msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), 
compactorTxnId);
         final StringBuilder jobName = new StringBuilder(workerName);
         jobName.append("-compactor-");
         jobName.append(ci.getFullPartitionName());
 
         LOG.info("Starting " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId));
-        final StatsUpdater su = StatsUpdater.init(ci, 
txnHandler.findColumnsWithStats(ci), conf,
+        final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats(
+            CompactionInfo.compactionInfoToStruct(ci)), conf,
           runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner());
         final CompactorMR mr = new CompactorMR();
         launchedJob = true;
         try {
           if (runJobAsSelf(ci.runAs)) {
-            mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, 
su, txnHandler);
+            mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, 
su, msc);
           } else {
             UserGroupInformation ugi = 
UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
@@ -192,7 +192,7 @@ public class Worker extends CompactorThread {
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, 
ci, su, txnHandler);
+                mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, 
ci, su, msc);
                 return null;
               }
             });
@@ -204,16 +204,28 @@ public class Worker extends CompactorThread {
             }
           }
           heartbeater.cancel();
-          txnHandler.markCompacted(ci);
-          txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
+          msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+          msc.commitTxn(compactorTxnId);
           if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
             mrJob = mr.getMrJob();
           }
         } catch (Exception e) {
           LOG.error("Caught exception while trying to compact " + ci +
               ".  Marking failed to avoid repeated failures, " + 
StringUtils.stringifyException(e));
-          txnHandler.markFailed(ci);
-          txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId));
+          msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+          msc.abortTxns(Collections.singletonList(compactorTxnId));
+        }
+      } catch (TException | IOException t) {
+        LOG.error("Caught an exception in the main loop of compactor worker " 
+ workerName + ", " +
+            StringUtils.stringifyException(t));
+        if (msc != null) {
+          msc.close();
+        }
+        msc = null;
+        try {
+          Thread.sleep(SLEEP_TIME);
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted while sleeping to instantiate metastore 
client");
         }
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor worker " 
+ workerName + ", " +
@@ -237,7 +249,7 @@ public class Worker extends CompactorThread {
   }
 
   @Override
-  public void init(AtomicBoolean stop, AtomicBoolean looped) throws 
MetaException {
+  public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
     super.init(stop, looped);
 
     StringBuilder name = new StringBuilder(hostname());
@@ -351,17 +363,16 @@ public class Worker extends CompactorThread {
 
   static final class CompactionHeartbeater extends Thread {
     static final private Logger LOG = 
LoggerFactory.getLogger(CompactionHeartbeater.class);
-    private final TxnStore txnHandler;
     private final AtomicBoolean stop = new AtomicBoolean();
     private final long compactorTxnId;
     private final String tableName;
     private final HiveConf conf;
     private final long interval;
-    public CompactionHeartbeater(TxnStore txnHandler, long compactorTxnId, 
String tableName, HiveConf conf) {
-      this.txnHandler = txnHandler;
+    public CompactionHeartbeater(long compactorTxnId, String tableName, 
HiveConf conf) {
       this.tableName = tableName;
       this.compactorTxnId = compactorTxnId;
       this.conf = conf;
+
       this.interval =
           MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 
TimeUnit.MILLISECONDS) / 2;
       setDaemon(true);
@@ -371,13 +382,12 @@ public class Worker extends CompactorThread {
     @Override
     public void run() {
       try {
+        // We need to create our own metastore client since the thrifts clients
+        // are not thread safe.
+        IMetaStoreClient msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
         LOG.debug("Heartbeating compaction transaction id {} for table: {}", 
compactorTxnId, tableName);
-        HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
-
-        heartbeatRequest.setTxnid(compactorTxnId);
-        heartbeatRequest.setLockid(0);
         while(!stop.get()) {
-          txnHandler.heartbeat(heartbeatRequest);
+          msc.heartbeat(compactorTxnId, 0);
           Thread.sleep(interval);
         }
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index a3034fb..6f44e9b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1189,13 +1189,13 @@ public class TestTxnCommands2 {
     }
     return compactionsByState;
   }
-  public static void runWorker(HiveConf hiveConf) throws MetaException {
+  public static void runWorker(HiveConf hiveConf) throws Exception {
     TxnCommandsBaseForTests.runWorker(hiveConf);
   }
-  public static void runCleaner(HiveConf hiveConf) throws MetaException {
+  public static void runCleaner(HiveConf hiveConf) throws Exception {
     TxnCommandsBaseForTests.runCleaner(hiveConf);
   }
-  public static void runInitiator(HiveConf hiveConf) throws MetaException {
+  public static void runInitiator(HiveConf hiveConf) throws Exception {
     TxnCommandsBaseForTests.runInitiator(hiveConf);
   }
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 287aeae..dc2963d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -157,18 +156,18 @@ public abstract class TxnCommandsBaseForTests {
   protected String makeValuesClause(int[][] rows) {
     return TestTxnCommands2.makeValuesClause(rows);
   }
-  public static void runWorker(HiveConf hiveConf) throws MetaException {
+  public static void runWorker(HiveConf hiveConf) throws Exception {
     runCompactorThread(hiveConf, CompactorThreadType.WORKER);
   }
-  public static void runCleaner(HiveConf hiveConf) throws MetaException {
+  public static void runCleaner(HiveConf hiveConf) throws Exception {
     runCompactorThread(hiveConf, CompactorThreadType.CLEANER);
   }
-  public static void runInitiator(HiveConf hiveConf) throws MetaException {
+  public static void runInitiator(HiveConf hiveConf) throws Exception {
     runCompactorThread(hiveConf, CompactorThreadType.INITIATOR);
   }
   private enum CompactorThreadType {INITIATOR, WORKER, CLEANER}
   private static void runCompactorThread(HiveConf hiveConf, 
CompactorThreadType type)
-      throws MetaException {
+      throws Exception {
     AtomicBoolean stop = new AtomicBoolean(true);
     CompactorThread t = null;
     switch (type) {

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 0c55654..f9fb854 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -83,6 +83,8 @@ import 
org.apache.hadoop.hive.ql.security.authorization.PrivilegeSynchronizer;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
 import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -286,6 +288,12 @@ public class HiveServer2 extends CompositeService {
       throw new ServiceException(e);
     }
 
+    try {
+      maybeStartCompactorThreads(hiveConf);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
     // Setup web UI
     final int webUIPort;
     final String webHost;
@@ -1005,6 +1013,16 @@ public class HiveServer2 extends CompositeService {
     }
   }
 
+  private void maybeStartCompactorThreads(HiveConf hiveConf) throws Exception {
+    if (MetastoreConf.getVar(hiveConf, 
MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("hs2")) {
+      int numWorkers = MetastoreConf.getIntVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
+      for (int i = 0; i < numWorkers; i++) {
+        Worker w = new Worker();
+        CompactorThread.initializeAndStartThread(w, hiveConf);
+      }
+    }
+  }
+
   /**
    * Remove all znodes corresponding to the given version number from ZooKeeper
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java
index d85dda5..f7d9ed2 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java
@@ -877,14 +877,14 @@ import org.slf4j.LoggerFactory;
           case 4: // PARTITIONS
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
-                org.apache.thrift.protocol.TList _list960 = 
iprot.readListBegin();
-                struct.partitions = new ArrayList<Partition>(_list960.size);
-                Partition _elem961;
-                for (int _i962 = 0; _i962 < _list960.size; ++_i962)
+                org.apache.thrift.protocol.TList _list968 = 
iprot.readListBegin();
+                struct.partitions = new ArrayList<Partition>(_list968.size);
+                Partition _elem969;
+                for (int _i970 = 0; _i970 < _list968.size; ++_i970)
                 {
-                  _elem961 = new Partition();
-                  _elem961.read(iprot);
-                  struct.partitions.add(_elem961);
+                  _elem969 = new Partition();
+                  _elem969.read(iprot);
+                  struct.partitions.add(_elem969);
                 }
                 iprot.readListEnd();
               }
@@ -952,9 +952,9 @@ import org.slf4j.LoggerFactory;
         oprot.writeFieldBegin(PARTITIONS_FIELD_DESC);
         {
           oprot.writeListBegin(new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, 
struct.partitions.size()));
-          for (Partition _iter963 : struct.partitions)
+          for (Partition _iter971 : struct.partitions)
           {
-            _iter963.write(oprot);
+            _iter971.write(oprot);
           }
           oprot.writeListEnd();
         }
@@ -1000,9 +1000,9 @@ import org.slf4j.LoggerFactory;
       oprot.writeString(struct.tableName);
       {
         oprot.writeI32(struct.partitions.size());
-        for (Partition _iter964 : struct.partitions)
+        for (Partition _iter972 : struct.partitions)
         {
-          _iter964.write(oprot);
+          _iter972.write(oprot);
         }
       }
       BitSet optionals = new BitSet();
@@ -1041,14 +1041,14 @@ import org.slf4j.LoggerFactory;
       struct.tableName = iprot.readString();
       struct.setTableNameIsSet(true);
       {
-        org.apache.thrift.protocol.TList _list965 = new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, 
iprot.readI32());
-        struct.partitions = new ArrayList<Partition>(_list965.size);
-        Partition _elem966;
-        for (int _i967 = 0; _i967 < _list965.size; ++_i967)
+        org.apache.thrift.protocol.TList _list973 = new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, 
iprot.readI32());
+        struct.partitions = new ArrayList<Partition>(_list973.size);
+        Partition _elem974;
+        for (int _i975 = 0; _i975 < _list973.size; ++_i975)
         {
-          _elem966 = new Partition();
-          _elem966.read(iprot);
-          struct.partitions.add(_elem966);
+          _elem974 = new Partition();
+          _elem974.read(iprot);
+          struct.partitions.add(_elem974);
         }
       }
       struct.setPartitionsIsSet(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java
index 3eb55b1..f4e3d6b 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java
@@ -351,13 +351,13 @@ import org.slf4j.LoggerFactory;
           case 1: // FILE_IDS
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
-                org.apache.thrift.protocol.TList _list840 = 
iprot.readListBegin();
-                struct.fileIds = new ArrayList<Long>(_list840.size);
-                long _elem841;
-                for (int _i842 = 0; _i842 < _list840.size; ++_i842)
+                org.apache.thrift.protocol.TList _list848 = 
iprot.readListBegin();
+                struct.fileIds = new ArrayList<Long>(_list848.size);
+                long _elem849;
+                for (int _i850 = 0; _i850 < _list848.size; ++_i850)
                 {
-                  _elem841 = iprot.readI64();
-                  struct.fileIds.add(_elem841);
+                  _elem849 = iprot.readI64();
+                  struct.fileIds.add(_elem849);
                 }
                 iprot.readListEnd();
               }
@@ -383,9 +383,9 @@ import org.slf4j.LoggerFactory;
         oprot.writeFieldBegin(FILE_IDS_FIELD_DESC);
         {
           oprot.writeListBegin(new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, 
struct.fileIds.size()));
-          for (long _iter843 : struct.fileIds)
+          for (long _iter851 : struct.fileIds)
           {
-            oprot.writeI64(_iter843);
+            oprot.writeI64(_iter851);
           }
           oprot.writeListEnd();
         }
@@ -410,9 +410,9 @@ import org.slf4j.LoggerFactory;
       TTupleProtocol oprot = (TTupleProtocol) prot;
       {
         oprot.writeI32(struct.fileIds.size());
-        for (long _iter844 : struct.fileIds)
+        for (long _iter852 : struct.fileIds)
         {
-          oprot.writeI64(_iter844);
+          oprot.writeI64(_iter852);
         }
       }
     }
@@ -421,13 +421,13 @@ import org.slf4j.LoggerFactory;
     public void read(org.apache.thrift.protocol.TProtocol prot, 
ClearFileMetadataRequest struct) throws org.apache.thrift.TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
       {
-        org.apache.thrift.protocol.TList _list845 = new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, 
iprot.readI32());
-        struct.fileIds = new ArrayList<Long>(_list845.size);
-        long _elem846;
-        for (int _i847 = 0; _i847 < _list845.size; ++_i847)
+        org.apache.thrift.protocol.TList _list853 = new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, 
iprot.readI32());
+        struct.fileIds = new ArrayList<Long>(_list853.size);
+        long _elem854;
+        for (int _i855 = 0; _i855 < _list853.size; ++_i855)
         {
-          _elem846 = iprot.readI64();
-          struct.fileIds.add(_elem846);
+          _elem854 = iprot.readI64();
+          struct.fileIds.add(_elem854);
         }
       }
       struct.setFileIdsIsSet(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java
index 17f8b77..2b39444 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java
@@ -354,13 +354,13 @@ import org.slf4j.LoggerFactory;
           case 1: // VALUES
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
-                org.apache.thrift.protocol.TList _list856 = 
iprot.readListBegin();
-                struct.values = new ArrayList<ClientCapability>(_list856.size);
-                ClientCapability _elem857;
-                for (int _i858 = 0; _i858 < _list856.size; ++_i858)
+                org.apache.thrift.protocol.TList _list864 = 
iprot.readListBegin();
+                struct.values = new ArrayList<ClientCapability>(_list864.size);
+                ClientCapability _elem865;
+                for (int _i866 = 0; _i866 < _list864.size; ++_i866)
                 {
-                  _elem857 = 
org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32());
-                  struct.values.add(_elem857);
+                  _elem865 = 
org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32());
+                  struct.values.add(_elem865);
                 }
                 iprot.readListEnd();
               }
@@ -386,9 +386,9 @@ import org.slf4j.LoggerFactory;
         oprot.writeFieldBegin(VALUES_FIELD_DESC);
         {
           oprot.writeListBegin(new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, 
struct.values.size()));
-          for (ClientCapability _iter859 : struct.values)
+          for (ClientCapability _iter867 : struct.values)
           {
-            oprot.writeI32(_iter859.getValue());
+            oprot.writeI32(_iter867.getValue());
           }
           oprot.writeListEnd();
         }
@@ -413,9 +413,9 @@ import org.slf4j.LoggerFactory;
       TTupleProtocol oprot = (TTupleProtocol) prot;
       {
         oprot.writeI32(struct.values.size());
-        for (ClientCapability _iter860 : struct.values)
+        for (ClientCapability _iter868 : struct.values)
         {
-          oprot.writeI32(_iter860.getValue());
+          oprot.writeI32(_iter868.getValue());
         }
       }
     }
@@ -424,13 +424,13 @@ import org.slf4j.LoggerFactory;
     public void read(org.apache.thrift.protocol.TProtocol prot, 
ClientCapabilities struct) throws org.apache.thrift.TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
       {
-        org.apache.thrift.protocol.TList _list861 = new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, 
iprot.readI32());
-        struct.values = new ArrayList<ClientCapability>(_list861.size);
-        ClientCapability _elem862;
-        for (int _i863 = 0; _i863 < _list861.size; ++_i863)
+        org.apache.thrift.protocol.TList _list869 = new 
org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, 
iprot.readI32());
+        struct.values = new ArrayList<ClientCapability>(_list869.size);
+        ClientCapability _elem870;
+        for (int _i871 = 0; _i871 < _list869.size; ++_i871)
         {
-          _elem862 = 
org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32());
-          struct.values.add(_elem862);
+          _elem870 = 
org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32());
+          struct.values.add(_elem870);
         }
       }
       struct.setValuesIsSet(true);

Reply via email to