HIVE-20936 Allow the Worker thread in the metastore to run outside of it (Jaume Marhuenda via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/926c1e8e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/926c1e8e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/926c1e8e Branch: refs/heads/master Commit: 926c1e8e352582a2bb89d458fb45c89c698c56fc Parents: 49f8f84 Author: Jaume Marhuenda <jmarhue...@hortonworks.com> Authored: Fri Dec 21 20:32:58 2018 -0800 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Fri Dec 21 20:33:35 2018 -0800 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/TestStreaming.java | 3 +- .../hive/ql/txn/compactor/TestCompactor.java | 2 +- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 4 +- .../hive/ql/txn/compactor/CompactorMR.java | 18 +- .../hive/ql/txn/compactor/CompactorThread.java | 71 +- .../hadoop/hive/ql/txn/compactor/Initiator.java | 4 +- .../txn/compactor/MetaStoreCompactorThread.java | 86 + .../ql/txn/compactor/RemoteCompactorThread.java | 65 + .../hadoop/hive/ql/txn/compactor/Worker.java | 90 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 6 +- .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 9 +- .../apache/hive/service/server/HiveServer2.java | 18 + .../metastore/api/AlterPartitionsRequest.java | 36 +- .../metastore/api/ClearFileMetadataRequest.java | 32 +- .../hive/metastore/api/ClientCapabilities.java | 32 +- .../metastore/api/CompactionInfoStruct.java | 1533 +++ .../metastore/api/FindSchemasByColsResp.java | 36 +- .../hive/metastore/api/FireEventRequest.java | 32 +- .../metastore/api/GetAllFunctionsResponse.java | 36 +- .../api/GetFileMetadataByExprRequest.java | 32 +- .../api/GetFileMetadataByExprResult.java | 48 +- .../metastore/api/GetFileMetadataRequest.java | 32 +- .../metastore/api/GetFileMetadataResult.java | 44 +- .../metastore/api/GetPartitionsFilterSpec.java | 32 +- .../api/GetPartitionsProjectionSpec.java | 32 +- .../metastore/api/GetPartitionsRequest.java | 32 +- .../metastore/api/GetPartitionsResponse.java | 36 +- .../hive/metastore/api/GetTablesRequest.java | 32 +- .../hive/metastore/api/GetTablesResult.java | 36 +- .../metastore/api/InsertEventRequestData.java | 96 +- .../metastore/api/NotificationEventRequest.java | 32 +- .../api/NotificationEventResponse.java | 36 +- .../api/OptionalCompactionInfoStruct.java | 395 + .../metastore/api/PutFileMetadataRequest.java | 64 +- .../metastore/api/RenamePartitionRequest.java | 32 +- .../hive/metastore/api/SchemaVersion.java | 36 +- .../hive/metastore/api/ThriftHiveMetastore.java | 11977 +++++++++++++---- .../hive/metastore/api/WMFullResourcePlan.java | 144 +- .../api/WMGetAllResourcePlanResponse.java | 36 +- .../WMGetTriggersForResourePlanResponse.java | 36 +- .../api/WMValidateResourcePlanResponse.java | 64 +- .../api/WriteNotificationLogRequest.java | 32 +- .../gen-php/metastore/ThriftHiveMetastore.php | 2947 +++- .../src/gen/thrift/gen-php/metastore/Types.php | 1064 +- .../hive_metastore/ThriftHiveMetastore-remote | 49 + .../hive_metastore/ThriftHiveMetastore.py | 3643 +++-- .../gen/thrift/gen-py/hive_metastore/ttypes.py | 726 +- .../gen/thrift/gen-rb/hive_metastore_types.rb | 61 + .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 397 + .../hive/metastore/HiveMetaStoreClient.java | 35 + .../hadoop/hive/metastore/IMetaStoreClient.java | 59 + .../hive/metastore/conf/MetastoreConf.java | 4 + .../src/main/thrift/hive_metastore.thrift | 26 + .../hadoop/hive/metastore/HiveMetaStore.java | 44 +- .../hadoop/hive/metastore/MetaStoreThread.java | 3 +- .../hive/metastore/txn/CompactionInfo.java | 73 +- .../metastore/utils/MetaStoreServerUtils.java | 2 - .../HiveMetaStoreClientPreCatalog.java | 35 + .../apache/hive/streaming/TestStreaming.java | 3 +- 59 files changed, 18604 insertions(+), 6016 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index b290a40..5935220 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; @@ -424,7 +423,7 @@ public class TestStreaming { /** * this is a clone from TestTxnStatement2.... */ - public static void runWorker(HiveConf hiveConf) throws MetaException { + public static void runWorker(HiveConf hiveConf) throws Exception { AtomicBoolean stop = new AtomicBoolean(true); Worker t = new Worker(); t.setThreadId((int) t.getId()); http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index d3800cd..2b22a62 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1074,7 +1074,7 @@ public class TestCompactor { } private void runMajorCompaction( - String dbName, String tblName, String... partNames) throws MetaException { + String dbName, String tblName, String... partNames) throws Exception { TxnStore txnHandler = TxnUtils.getTxnStore(conf); Worker t = new Worker(); t.setThreadId((int) t.getId()); http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 18253c9..06b0209 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -59,7 +59,7 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa /** * A class to clean directories after compactions. This will run in a separate thread. */ -public class Cleaner extends CompactorThread { +public class Cleaner extends MetaStoreCompactorThread { static final private String CLASS_NAME = Cleaner.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private long cleanerCheckInterval = 0; @@ -67,7 +67,7 @@ public class Cleaner extends CompactorThread { private ReplChangeManager replChangeManager; @Override - public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { + public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); replChangeManager = ReplChangeManager.getInstance(conf); } http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 42ce174..7d5ee4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -58,7 +59,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -96,6 +96,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.Ref; import org.apache.parquet.Strings; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,7 +227,7 @@ public class CompactorMR { * @throws java.io.IOException if the job fails */ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, - CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException { + CompactionInfo ci, Worker.StatsUpdater su, IMetaStoreClient msc) throws IOException { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); @@ -268,7 +269,7 @@ public class CompactorMR { launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1, conf, txnHandler, ci.id, jobName); + maxDeltastoHandle, -1, conf, msc, ci.id, jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds); @@ -312,7 +313,7 @@ public class CompactorMR { } launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), - dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, txnHandler, ci.id, jobName); + dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, ci.id, jobName); su.gatherStats(); } @@ -539,7 +540,7 @@ public class CompactorMR { StringableList dirsToSearch, List<AcidUtils.ParsedDelta> parsedDeltas, int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf, - TxnStore txnHandler, long id, String jobName) throws IOException { + IMetaStoreClient msc, long id, String jobName) throws IOException { job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); if(dirsToSearch == null) { dirsToSearch = new StringableList(); @@ -579,7 +580,12 @@ public class CompactorMR { RunningJob rj = jc.submitJob(job); LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id); - txnHandler.setHadoopJobId(rj.getID().toString(), id); + try { + msc.setHadoopJobid(rj.getID().toString(), id); + } catch (TException e) { + LOG.warn("Error setting hadoop job, jobId=" + rj.getID().toString() + + " compactionId=" + id, e); + } rj.waitForCompletion(); if (!rj.isSuccessful()) { throw new IOException((compactionType == CompactionType.MAJOR ? "Major" : "Minor") + http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index f5b901d..f45140d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -17,23 +17,19 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.RawStore; -import org.apache.hadoop.hive.metastore.RawStoreProxy; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -42,26 +38,29 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; /** * Superclass for all threads in the compactor. */ -public abstract class CompactorThread extends Thread implements MetaStoreThread { +public abstract class CompactorThread extends Thread implements Configurable { static final private String CLASS_NAME = CompactorThread.class.getName(); - static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + protected static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); protected HiveConf conf; - protected TxnStore txnHandler; protected RawStore rs; - protected int threadId; protected AtomicBoolean stop; protected AtomicBoolean looped; + protected int threadId; + + public void setThreadId(int threadId) { + this.threadId = threadId; + } + @Override public void setConf(Configuration configuration) { // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to call other methods with @@ -77,24 +76,11 @@ public abstract class CompactorThread extends Thread implements MetaStoreThread return conf; } - @Override - public void setThreadId(int threadId) { - this.threadId = threadId; - } - - @Override - public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { - this.stop = stop; - this.looped = looped; + public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { setPriority(MIN_PRIORITY); setDaemon(true); // this means the process will exit without waiting for this thread - - // Get our own instance of the transaction handler - txnHandler = TxnUtils.getTxnStore(conf); - - // Get our own connection to the database so we can get table and partition information. - rs = RawStoreProxy.getProxy(conf, conf, - MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); + this.stop = stop; + this.looped = looped; } /** @@ -103,14 +89,15 @@ public abstract class CompactorThread extends Thread implements MetaStoreThread * @return metastore table * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table cannot be found. */ - protected Table resolveTable(CompactionInfo ci) throws MetaException { - try { - return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); - } catch (MetaException e) { - LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); - throw e; - } - } + abstract Table resolveTable(CompactionInfo ci) throws MetaException; + + /** + * Get list of partitions by name. + * @param ci compaction info. + * @return list of partitions + * @throws MetaException if an error occurs. + */ + abstract List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException; /** * Get the partition being compacted. @@ -123,8 +110,7 @@ public abstract class CompactorThread extends Thread implements MetaStoreThread if (ci.partName != null) { List<Partition> parts; try { - parts = rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, - Collections.singletonList(ci.partName)); + parts = getPartitionsByNames(ci); if (parts == null || parts.size() == 0) { // The partition got dropped before we went looking for it. return null; @@ -220,4 +206,15 @@ public abstract class CompactorThread extends Thread implements MetaStoreThread protected String tableName(Table t) { return Warehouse.getQualifiedName(t); } + + private static AtomicInteger nextThreadId = new AtomicInteger(1000000); + + public static void initializeAndStartThread(CompactorThread thread, + Configuration conf) throws Exception { + LOG.info("Starting compactor thread of type " + thread.getClass().getName()); + thread.setConf(conf); + thread.setThreadId(nextThreadId.incrementAndGet()); + thread.init(new AtomicBoolean(), new AtomicBoolean()); + thread.start(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index cdcc0e9..a0df82c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * A class to initiate compactions. This will run in a separate thread. * It's critical that there exactly 1 of these in a given warehouse. */ -public class Initiator extends CompactorThread { +public class Initiator extends MetaStoreCompactorThread { static final private String CLASS_NAME = Initiator.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); @@ -202,7 +202,7 @@ public class Initiator extends CompactorThread { } @Override - public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { + public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ; http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java new file mode 100644 index 0000000..1ddc54d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.metastore.MetaStoreThread; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.RawStoreProxy; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + +/** + * Compactor threads that runs in the metastore. It uses a {@link TxnStore} + * to access the internal database. + */ +public class MetaStoreCompactorThread extends CompactorThread implements MetaStoreThread { + + protected TxnStore txnHandler; + protected RawStore rs; + protected int threadId; + + @Override + public void setThreadId(int threadId) { + this.threadId = threadId; + } + + @Override + public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { + super.init(stop, looped); + + // Get our own instance of the transaction handler + txnHandler = TxnUtils.getTxnStore(conf); + + // Get our own connection to the database so we can get table and partition information. + rs = RawStoreProxy.getProxy(conf, conf, + MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); + } + + @Override Table resolveTable(CompactionInfo ci) throws MetaException { + try { + return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); + } catch (MetaException e) { + LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); + throw e; + } + } + + @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException { + try { + return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, + Collections.singletonList(ci.partName)); + } catch (MetaException e) { + LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); + throw e; + } catch (NoSuchObjectException e) { + LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); + throw new MetaException(e.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java new file mode 100644 index 0000000..9678786 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.thrift.TException; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + +/** + * Compactor thread that can run outside the metastore. It will + * use the metastore thrift API which will default to a remote connection + * if the thread is running outside the metastore or to a function call + * if it's running within the metastore. + */ +public class RemoteCompactorThread extends CompactorThread { + protected IMetaStoreClient msc; + + public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { + super.init(stop, looped); + this.msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); + } + + @Override Table resolveTable(CompactionInfo ci) throws MetaException { + try { + return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); + } catch (TException e) { + LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); + throw new MetaException(e.toString()); + } + } + + @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException { + try { + return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, + Collections.singletonList(ci.partName)); + } catch (TException e) { + LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); + throw new MetaException(e.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 49662cd..b645870 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -21,19 +21,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; -import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; -import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; -import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -62,10 +59,10 @@ import java.util.concurrent.atomic.AtomicBoolean; * A class to do compactions. This will run in a separate thread. It will spin on the * compaction queue and look for new work to do. */ -public class Worker extends CompactorThread { +public class Worker extends RemoteCompactorThread implements MetaStoreThread { static final private String CLASS_NAME = Worker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - static final private long SLEEP_TIME = 5000; + static final private long SLEEP_TIME = 10000; static final private int baseThreadNum = 10002; private String workerName; @@ -88,13 +85,18 @@ public class Worker extends CompactorThread { // don't go through Initiator for user initiated compactions) @Override public void run() { + LOG.info("Starting Worker thread"); do { boolean launchedJob = false; // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. CompactionHeartbeater heartbeater = null; try { - final CompactionInfo ci = txnHandler.findNextToCompact(workerName); + if (msc == null) { + msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); + } + final CompactionInfo ci = CompactionInfo.optionalCompactionInfoStructToInfo( + msc.findNextCompact(workerName)); LOG.debug("Processing compaction request " + ci); if (ci == null && !stop.get()) { @@ -114,11 +116,11 @@ public class Worker extends CompactorThread { if (t1 == null) { LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped and moving on."); - txnHandler.markCleaned(ci); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (MetaException e) { - txnHandler.markCleaned(ci); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } // This chicanery is to get around the fact that the table needs to be final in order to @@ -132,11 +134,11 @@ public class Worker extends CompactorThread { if (p == null && ci.partName != null) { LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped and moving on."); - txnHandler.markCleaned(ci); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (Exception e) { - txnHandler.markCleaned(ci); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -146,45 +148,43 @@ public class Worker extends CompactorThread { // Check that the table or partition isn't sorted, as we don't yet support that. if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { LOG.error("Attempt to compact sorted table, which is not yet supported!"); - txnHandler.markCleaned(ci); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); if (ci.runAs == null) { ci.runAs = findUserToRunAs(sd.getLocation(), t); } - OpenTxnRequest otReq = new OpenTxnRequest(1, ci.runAs, hostname()); - otReq.setAgentInfo(getName());//ThreadName - long compactorTxnId = txnHandler.openTxns(otReq).getTxn_ids().get(0); + long compactorTxnId = msc.openTxns(ci.runAs, 1).getTxn_ids().get(0); - heartbeater = new CompactionHeartbeater(txnHandler, compactorTxnId, fullTableName, conf); + heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); - ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId); - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId); //with this ValidWriteIdList is capped at whatever HWM validTxnList has - rqst.setValidTxnList(validTxnList.writeToString()); final ValidCompactorWriteIdList tblValidWriteIds = - TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); + TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( + Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed - txnHandler.updateCompactorState(ci, compactorTxnId); + msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); final StringBuilder jobName = new StringBuilder(workerName); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); - final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf, + final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats( + CompactionInfo.compactionInfoToStruct(ci)), conf, runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); launchedJob = true; try { if (runJobAsSelf(ci.runAs)) { - mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); @@ -192,7 +192,7 @@ public class Worker extends CompactorThread { ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, msc); return null; } }); @@ -204,16 +204,28 @@ public class Worker extends CompactorThread { } } heartbeater.cancel(); - txnHandler.markCompacted(ci); - txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId)); + msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + msc.commitTxn(compactorTxnId); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { mrJob = mr.getMrJob(); } } catch (Exception e) { LOG.error("Caught exception while trying to compact " + ci + ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); - txnHandler.markFailed(ci); - txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId)); + msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); + msc.abortTxns(Collections.singletonList(compactorTxnId)); + } + } catch (TException | IOException t) { + LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + + StringUtils.stringifyException(t)); + if (msc != null) { + msc.close(); + } + msc = null; + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + LOG.error("Interrupted while sleeping to instantiate metastore client"); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + @@ -237,7 +249,7 @@ public class Worker extends CompactorThread { } @Override - public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { + public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); StringBuilder name = new StringBuilder(hostname()); @@ -351,17 +363,16 @@ public class Worker extends CompactorThread { static final class CompactionHeartbeater extends Thread { static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class); - private final TxnStore txnHandler; private final AtomicBoolean stop = new AtomicBoolean(); private final long compactorTxnId; private final String tableName; private final HiveConf conf; private final long interval; - public CompactionHeartbeater(TxnStore txnHandler, long compactorTxnId, String tableName, HiveConf conf) { - this.txnHandler = txnHandler; + public CompactionHeartbeater(long compactorTxnId, String tableName, HiveConf conf) { this.tableName = tableName; this.compactorTxnId = compactorTxnId; this.conf = conf; + this.interval = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; setDaemon(true); @@ -371,13 +382,12 @@ public class Worker extends CompactorThread { @Override public void run() { try { + // We need to create our own metastore client since the thrifts clients + // are not thread safe. + IMetaStoreClient msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); LOG.debug("Heartbeating compaction transaction id {} for table: {}", compactorTxnId, tableName); - HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); - - heartbeatRequest.setTxnid(compactorTxnId); - heartbeatRequest.setLockid(0); while(!stop.get()) { - txnHandler.heartbeat(heartbeatRequest); + msc.heartbeat(compactorTxnId, 0); Thread.sleep(interval); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index a3034fb..6f44e9b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1189,13 +1189,13 @@ public class TestTxnCommands2 { } return compactionsByState; } - public static void runWorker(HiveConf hiveConf) throws MetaException { + public static void runWorker(HiveConf hiveConf) throws Exception { TxnCommandsBaseForTests.runWorker(hiveConf); } - public static void runCleaner(HiveConf hiveConf) throws MetaException { + public static void runCleaner(HiveConf hiveConf) throws Exception { TxnCommandsBaseForTests.runCleaner(hiveConf); } - public static void runInitiator(HiveConf hiveConf) throws MetaException { + public static void runInitiator(HiveConf hiveConf) throws Exception { TxnCommandsBaseForTests.runInitiator(hiveConf); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 287aeae..dc2963d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -157,18 +156,18 @@ public abstract class TxnCommandsBaseForTests { protected String makeValuesClause(int[][] rows) { return TestTxnCommands2.makeValuesClause(rows); } - public static void runWorker(HiveConf hiveConf) throws MetaException { + public static void runWorker(HiveConf hiveConf) throws Exception { runCompactorThread(hiveConf, CompactorThreadType.WORKER); } - public static void runCleaner(HiveConf hiveConf) throws MetaException { + public static void runCleaner(HiveConf hiveConf) throws Exception { runCompactorThread(hiveConf, CompactorThreadType.CLEANER); } - public static void runInitiator(HiveConf hiveConf) throws MetaException { + public static void runInitiator(HiveConf hiveConf) throws Exception { runCompactorThread(hiveConf, CompactorThreadType.INITIATOR); } private enum CompactorThreadType {INITIATOR, WORKER, CLEANER} private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType type) - throws MetaException { + throws Exception { AtomicBoolean stop = new AtomicBoolean(true); CompactorThread t = null; switch (type) { http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 0c55654..f9fb854 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -83,6 +83,8 @@ import org.apache.hadoop.hive.ql.security.authorization.PrivilegeSynchronizer; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer; import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorThread; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -286,6 +288,12 @@ public class HiveServer2 extends CompositeService { throw new ServiceException(e); } + try { + maybeStartCompactorThreads(hiveConf); + } catch (Exception e) { + throw new RuntimeException(e); + } + // Setup web UI final int webUIPort; final String webHost; @@ -1005,6 +1013,16 @@ public class HiveServer2 extends CompositeService { } } + private void maybeStartCompactorThreads(HiveConf hiveConf) throws Exception { + if (MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("hs2")) { + int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS); + for (int i = 0; i < numWorkers; i++) { + Worker w = new Worker(); + CompactorThread.initializeAndStartThread(w, hiveConf); + } + } + } + /** * Remove all znodes corresponding to the given version number from ZooKeeper * http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java index d85dda5..f7d9ed2 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AlterPartitionsRequest.java @@ -877,14 +877,14 @@ import org.slf4j.LoggerFactory; case 4: // PARTITIONS if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list960 = iprot.readListBegin(); - struct.partitions = new ArrayList<Partition>(_list960.size); - Partition _elem961; - for (int _i962 = 0; _i962 < _list960.size; ++_i962) + org.apache.thrift.protocol.TList _list968 = iprot.readListBegin(); + struct.partitions = new ArrayList<Partition>(_list968.size); + Partition _elem969; + for (int _i970 = 0; _i970 < _list968.size; ++_i970) { - _elem961 = new Partition(); - _elem961.read(iprot); - struct.partitions.add(_elem961); + _elem969 = new Partition(); + _elem969.read(iprot); + struct.partitions.add(_elem969); } iprot.readListEnd(); } @@ -952,9 +952,9 @@ import org.slf4j.LoggerFactory; oprot.writeFieldBegin(PARTITIONS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.partitions.size())); - for (Partition _iter963 : struct.partitions) + for (Partition _iter971 : struct.partitions) { - _iter963.write(oprot); + _iter971.write(oprot); } oprot.writeListEnd(); } @@ -1000,9 +1000,9 @@ import org.slf4j.LoggerFactory; oprot.writeString(struct.tableName); { oprot.writeI32(struct.partitions.size()); - for (Partition _iter964 : struct.partitions) + for (Partition _iter972 : struct.partitions) { - _iter964.write(oprot); + _iter972.write(oprot); } } BitSet optionals = new BitSet(); @@ -1041,14 +1041,14 @@ import org.slf4j.LoggerFactory; struct.tableName = iprot.readString(); struct.setTableNameIsSet(true); { - org.apache.thrift.protocol.TList _list965 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.partitions = new ArrayList<Partition>(_list965.size); - Partition _elem966; - for (int _i967 = 0; _i967 < _list965.size; ++_i967) + org.apache.thrift.protocol.TList _list973 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.partitions = new ArrayList<Partition>(_list973.size); + Partition _elem974; + for (int _i975 = 0; _i975 < _list973.size; ++_i975) { - _elem966 = new Partition(); - _elem966.read(iprot); - struct.partitions.add(_elem966); + _elem974 = new Partition(); + _elem974.read(iprot); + struct.partitions.add(_elem974); } } struct.setPartitionsIsSet(true); http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java index 3eb55b1..f4e3d6b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java @@ -351,13 +351,13 @@ import org.slf4j.LoggerFactory; case 1: // FILE_IDS if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list840 = iprot.readListBegin(); - struct.fileIds = new ArrayList<Long>(_list840.size); - long _elem841; - for (int _i842 = 0; _i842 < _list840.size; ++_i842) + org.apache.thrift.protocol.TList _list848 = iprot.readListBegin(); + struct.fileIds = new ArrayList<Long>(_list848.size); + long _elem849; + for (int _i850 = 0; _i850 < _list848.size; ++_i850) { - _elem841 = iprot.readI64(); - struct.fileIds.add(_elem841); + _elem849 = iprot.readI64(); + struct.fileIds.add(_elem849); } iprot.readListEnd(); } @@ -383,9 +383,9 @@ import org.slf4j.LoggerFactory; oprot.writeFieldBegin(FILE_IDS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, struct.fileIds.size())); - for (long _iter843 : struct.fileIds) + for (long _iter851 : struct.fileIds) { - oprot.writeI64(_iter843); + oprot.writeI64(_iter851); } oprot.writeListEnd(); } @@ -410,9 +410,9 @@ import org.slf4j.LoggerFactory; TTupleProtocol oprot = (TTupleProtocol) prot; { oprot.writeI32(struct.fileIds.size()); - for (long _iter844 : struct.fileIds) + for (long _iter852 : struct.fileIds) { - oprot.writeI64(_iter844); + oprot.writeI64(_iter852); } } } @@ -421,13 +421,13 @@ import org.slf4j.LoggerFactory; public void read(org.apache.thrift.protocol.TProtocol prot, ClearFileMetadataRequest struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; { - org.apache.thrift.protocol.TList _list845 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); - struct.fileIds = new ArrayList<Long>(_list845.size); - long _elem846; - for (int _i847 = 0; _i847 < _list845.size; ++_i847) + org.apache.thrift.protocol.TList _list853 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); + struct.fileIds = new ArrayList<Long>(_list853.size); + long _elem854; + for (int _i855 = 0; _i855 < _list853.size; ++_i855) { - _elem846 = iprot.readI64(); - struct.fileIds.add(_elem846); + _elem854 = iprot.readI64(); + struct.fileIds.add(_elem854); } } struct.setFileIdsIsSet(true); http://git-wip-us.apache.org/repos/asf/hive/blob/926c1e8e/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java index 17f8b77..2b39444 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java @@ -354,13 +354,13 @@ import org.slf4j.LoggerFactory; case 1: // VALUES if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list856 = iprot.readListBegin(); - struct.values = new ArrayList<ClientCapability>(_list856.size); - ClientCapability _elem857; - for (int _i858 = 0; _i858 < _list856.size; ++_i858) + org.apache.thrift.protocol.TList _list864 = iprot.readListBegin(); + struct.values = new ArrayList<ClientCapability>(_list864.size); + ClientCapability _elem865; + for (int _i866 = 0; _i866 < _list864.size; ++_i866) { - _elem857 = org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32()); - struct.values.add(_elem857); + _elem865 = org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32()); + struct.values.add(_elem865); } iprot.readListEnd(); } @@ -386,9 +386,9 @@ import org.slf4j.LoggerFactory; oprot.writeFieldBegin(VALUES_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, struct.values.size())); - for (ClientCapability _iter859 : struct.values) + for (ClientCapability _iter867 : struct.values) { - oprot.writeI32(_iter859.getValue()); + oprot.writeI32(_iter867.getValue()); } oprot.writeListEnd(); } @@ -413,9 +413,9 @@ import org.slf4j.LoggerFactory; TTupleProtocol oprot = (TTupleProtocol) prot; { oprot.writeI32(struct.values.size()); - for (ClientCapability _iter860 : struct.values) + for (ClientCapability _iter868 : struct.values) { - oprot.writeI32(_iter860.getValue()); + oprot.writeI32(_iter868.getValue()); } } } @@ -424,13 +424,13 @@ import org.slf4j.LoggerFactory; public void read(org.apache.thrift.protocol.TProtocol prot, ClientCapabilities struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; { - org.apache.thrift.protocol.TList _list861 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, iprot.readI32()); - struct.values = new ArrayList<ClientCapability>(_list861.size); - ClientCapability _elem862; - for (int _i863 = 0; _i863 < _list861.size; ++_i863) + org.apache.thrift.protocol.TList _list869 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, iprot.readI32()); + struct.values = new ArrayList<ClientCapability>(_list869.size); + ClientCapability _elem870; + for (int _i871 = 0; _i871 < _list869.size; ++_i871) { - _elem862 = org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32()); - struct.values.add(_elem862); + _elem870 = org.apache.hadoop.hive.metastore.api.ClientCapability.findByValue(iprot.readI32()); + struct.values.add(_elem870); } } struct.setValuesIsSet(true);