This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5d4df61 HIVE-25904: ObjectStore's updateTableColumnStatistics is not ThreadSafe (Denys Kuzmenko, reviewed by Rajesh Balamohan) 5d4df61 is described below commit 5d4df61a5e1beb5f6b503dbc44434ec184c84b41 Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Wed Mar 23 09:20:33 2022 +0100 HIVE-25904: ObjectStore's updateTableColumnStatistics is not ThreadSafe (Denys Kuzmenko, reviewed by Rajesh Balamohan) Closes #2977 --- .../apache/hadoop/hive/metastore/ObjectStore.java | 57 ++++++++++++++++------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 90664e2..52a16f3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetAddress; import java.net.URI; import java.nio.ByteBuffer; @@ -35,6 +36,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -52,6 +54,8 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import javax.jdo.JDODataStoreException; @@ -63,6 +67,8 @@ import javax.jdo.Transaction; import javax.jdo.datastore.JDOConnection; import javax.jdo.identity.IntIdentity; +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Striped; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -186,7 +192,6 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.metrics.Metrics; @@ -333,6 +338,8 @@ public class ObjectStore implements RawStore, Configurable { private Counter directSqlErrors; private boolean areTxnStatsSupported = false; + private static Striped<Lock> tablelocks; + public ObjectStore() { } @@ -390,6 +397,15 @@ public class ObjectStore implements RawStore, Configurable { } else { LOG.debug("Initialized ObjectStore"); } + + if (tablelocks == null) { + synchronized (ObjectStore.class) { + if (tablelocks == null) { + int numTableLocks = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_NUM_STRIPED_TABLE_LOCKS); + tablelocks = Striped.lazyWeakLock(numTableLocks); + } + } + } } @SuppressWarnings("nls") @@ -9681,18 +9697,19 @@ public class ObjectStore implements RawStore, Configurable { } return statsMap; } - + @Override - public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats, - String validWriteIds, long writeId) - throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds, long writeId) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean committed = false; - openTransaction(); + List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); + ColumnStatisticsDesc statsDesc = colStats.getStatsDesc(); + + Lock tableLock = getTableLockFor(statsDesc.getDbName(), statsDesc.getTableName()); + tableLock.lock(); try { - List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); - ColumnStatisticsDesc statsDesc = colStats.getStatsDesc(); - + openTransaction(); // DataNucleus objects get detached all over the place for no (real) reason. // So let's not use them anywhere unless absolutely necessary. String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf); @@ -9705,10 +9722,10 @@ public class ObjectStore implements RawStore, Configurable { Map<String, MTableColumnStatistics> oldStats = getPartitionColStats(table, colNames, colStats.getEngine()); - for (ColumnStatisticsObj statsObj:statsObjs) { + for (ColumnStatisticsObj statsObj : statsObjs) { MTableColumnStatistics mStatsObj = StatObjectConverter.convertToMTableColumnStatistics( - mTable, statsDesc, - statsObj, colStats.getEngine()); + mTable, statsDesc, + statsObj, colStats.getEngine()); writeMTableColumnStatistics(table, mStatsObj, oldStats.get(statsObj.getColName())); // There is no need to add colname again, otherwise we will get duplicate colNames. } @@ -9727,7 +9744,7 @@ public class ObjectStore implements RawStore, Configurable { StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); } else { String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, name), - oldt.getParameters(), newParams, writeId, validWriteIds, true); + oldt.getParameters(), newParams, writeId, validWriteIds, true); if (errorMsg != null) { throw new MetaException(errorMsg); } @@ -9735,7 +9752,7 @@ public class ObjectStore implements RawStore, Configurable { // Make sure we set the flag to invalid regardless of the current value. StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " - + dbname + "." + name); + + dbname + "." + name); } oldt.setWriteId(writeId); } @@ -9746,12 +9763,20 @@ public class ObjectStore implements RawStore, Configurable { // TODO: similar to update...Part, this used to do "return committed;"; makes little sense. return committed ? newParams : null; } finally { - if (!committed) { - rollbackTransaction(); + try { + if (!committed) { + rollbackTransaction(); + } + } finally { + tableLock.unlock(); } } } + private Lock getTableLockFor(String dbName, String tblName) { + return tablelocks.get(dbName + "." + tblName); + } + /** * Get partition's column stats *