This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fd43e64a729 [Enhancement](sql-cache) Use update time of hive to avoid cache miss through multi fe nodes. (#26424) fd43e64a729 is described below commit fd43e64a729c4f6c85e48da1f50b05e487f4f6ff Author: Xiangyu Wang <dut.xian...@gmail.com> AuthorDate: Fri Nov 10 17:36:00 2023 +0800 [Enhancement](sql-cache) Use update time of hive to avoid cache miss through multi fe nodes. (#26424) Now the update time of hms table is generated by every FE node (Use `System.currentTimestamp()` separately), so the update time of a hms table may be different between FE nodes, always the same query can not hit the sql-cache if we submit it more than one times through different FE nodes. This pr mainly do following changes to avoid this problem. - Use the `transient_lastDdlTime` instead of `System.currentTimestamp` as the `schemaUpdateTime` of hms tables - Use the `eventTime` in hms event instead of `System.currentTimestamp` as the update time when processing hms events --- .../doris/catalog/external/HMSExternalTable.java | 32 ++++++++--- .../org/apache/doris/datasource/CatalogMgr.java | 66 +++++++++++++++++----- .../datasource/hive/event/AddPartitionEvent.java | 2 +- .../datasource/hive/event/AlterPartitionEvent.java | 6 +- .../datasource/hive/event/AlterTableEvent.java | 3 +- .../datasource/hive/event/DropPartitionEvent.java | 3 +- .../doris/datasource/hive/event/InsertEvent.java | 3 +- .../datasource/hive/event/MetastoreEvent.java | 13 +++-- .../hive/event/MetastoreEventsProcessor.java | 3 +- .../org/apache/doris/qe/HmsQueryCacheTest.java | 17 +++--- 10 files changed, 108 insertions(+), 40 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 361d278f8ac..b5e45b5bcfe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -41,6 +41,7 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -82,6 +83,9 @@ public class HMSExternalTable extends ExternalTable { private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties"; private static final String TBL_PROP_INSERT_ONLY = "insert_only"; + + private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime"; + private static final String NUM_ROWS = "numRows"; static { @@ -112,8 +116,8 @@ public class HMSExternalTable extends ExternalTable { // No as precise as row count in TableStats, but better than none. private long estimatedRowCount = -1; - // record the partition update time when enable hms event listener - protected volatile long partitionUpdateTime; + // record the event update time when enable hms event listener + protected volatile long eventUpdateTime; public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG, DELTALAKE @@ -405,6 +409,20 @@ public class HMSExternalTable extends ExternalTable { return new HashSet<>(names); } + @Override + public List<Column> initSchemaAndUpdateTime() { + org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() + .getTable(dbName, name); + // try to use transient_lastDdlTime from hms client + schemaUpdateTime = MapUtils.isNotEmpty(table.getParameters()) + && table.getParameters().containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME) + ? Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000 + // use current timestamp if lastDdlTime does not exist (hive views don't have this prop) + : System.currentTimeMillis(); + return initSchema(); + } + + @Override public List<Column> initSchema() { makeSureInitialized(); @@ -635,15 +653,15 @@ public class HMSExternalTable extends ExternalTable { } } - public void setPartitionUpdateTime(long updateTime) { - this.partitionUpdateTime = updateTime; + public void setEventUpdateTime(long updateTime) { + this.eventUpdateTime = updateTime; } @Override - // get the max value of `schemaUpdateTime` and `partitionUpdateTime` - // partitionUpdateTime will be refreshed after processing partition events with hms event listener enabled + // get the max value of `schemaUpdateTime` and `eventUpdateTime` + // eventUpdateTime will be refreshed after processing events with hms event listener enabled public long getUpdateTime() { - return Math.max(this.schemaUpdateTime, this.partitionUpdateTime); + return Math.max(this.schemaUpdateTime, this.eventUpdateTime); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index e92e6820481..79dd31a26ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -650,6 +650,44 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } } + public void refreshExternalTableFromEvent(String dbName, String tableName, String catalogName, + long updateTime, boolean ignoreIfNotExists) throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support refresh ExternalCatalog Tables"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + if (!ignoreIfNotExists) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + return; + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + if (!ignoreIfNotExists) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + return; + } + if (!(table instanceof HMSExternalTable)) { + return; + } + ((HMSExternalTable) table).unsetObjectCreated(); + ((HMSExternalTable) table).setEventUpdateTime(updateTime); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName); + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableId(table.getId()); + log.setLastUpdateTime(updateTime); + Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); + } + public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); @@ -704,6 +742,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { table.unsetObjectCreated(); Env.getCurrentEnv().getExtMetaCacheMgr() .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); + if (table instanceof HMSExternalTable && log.getLastUpdateTime() > 0) { + ((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime()); + } } public void dropExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists) @@ -923,8 +964,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } } - public void addExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames, - boolean ignoreIfNotExists) + public void addExternalPartitions(String catalogName, String dbName, String tableName, + List<String> partitionNames, long updateTime, boolean ignoreIfNotExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { @@ -955,14 +996,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable { HMSExternalTable hmsTable = (HMSExternalTable) table; Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames); - long lastPartitionUpdateTime = System.currentTimeMillis(); - hmsTable.setPartitionUpdateTime(lastPartitionUpdateTime); + hmsTable.setEventUpdateTime(updateTime); ExternalObjectLog log = new ExternalObjectLog(); log.setCatalogId(catalog.getId()); log.setDbId(db.getId()); log.setTableId(table.getId()); log.setPartitionNames(partitionNames); - log.setLastUpdateTime(lastPartitionUpdateTime); + log.setLastUpdateTime(updateTime); Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log); } @@ -993,7 +1033,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { try { Env.getCurrentEnv().getExtMetaCacheMgr() .addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); - hmsTable.setPartitionUpdateTime(log.getLastUpdateTime()); + hmsTable.setEventUpdateTime(log.getLastUpdateTime()); } catch (HMSClientException e) { LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e); Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), @@ -1001,8 +1041,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } } - public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames, - boolean ignoreIfNotExists) + public void dropExternalPartitions(String catalogName, String dbName, String tableName, + List<String> partitionNames, long updateTime, boolean ignoreIfNotExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { @@ -1032,7 +1072,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { log.setDbId(db.getId()); log.setTableId(table.getId()); log.setPartitionNames(partitionNames); - log.setLastUpdateTime(System.currentTimeMillis()); + log.setLastUpdateTime(updateTime); replayDropExternalPartitions(log); Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log); } @@ -1062,11 +1102,11 @@ public class CatalogMgr implements Writable, GsonPostProcessable { HMSExternalTable hmsTable = (HMSExternalTable) table; Env.getCurrentEnv().getExtMetaCacheMgr() .dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); - hmsTable.setPartitionUpdateTime(log.getLastUpdateTime()); + hmsTable.setEventUpdateTime(log.getLastUpdateTime()); } public void refreshExternalPartitions(String catalogName, String dbName, String tableName, - List<String> partitionNames, boolean ignoreIfNotExists) + List<String> partitionNames, long updateTime, boolean ignoreIfNotExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { @@ -1099,7 +1139,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { log.setDbId(db.getId()); log.setTableId(table.getId()); log.setPartitionNames(partitionNames); - log.setLastUpdateTime(System.currentTimeMillis()); + log.setLastUpdateTime(updateTime); replayRefreshExternalPartitions(log); Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log); } @@ -1129,7 +1169,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { Env.getCurrentEnv().getExtMetaCacheMgr() .invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(), log.getPartitionNames()); - ((HMSExternalTable) table).setPartitionUpdateTime(log.getLastUpdateTime()); + ((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime()); } public void registerCatalogRefreshListener(Env env) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index 8872c9b5965..e1dacbd0b29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -103,7 +103,7 @@ public class AddPartitionEvent extends MetastorePartitionEvent { return; } Env.getCurrentEnv().getCatalogMgr() - .addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true); + .addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, eventTime, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index 445d48a1888..4085901de9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -114,14 +114,14 @@ public class AlterPartitionEvent extends MetastorePartitionEvent { if (isRename) { Env.getCurrentEnv().getCatalogMgr() .dropExternalPartitions(catalogName, dbName, tblName, - Lists.newArrayList(partitionNameBefore), true); + Lists.newArrayList(partitionNameBefore), eventTime, true); Env.getCurrentEnv().getCatalogMgr() .addExternalPartitions(catalogName, dbName, tblName, - Lists.newArrayList(partitionNameAfter), true); + Lists.newArrayList(partitionNameAfter), eventTime, true); } else { Env.getCurrentEnv().getCatalogMgr() .refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), - Lists.newArrayList(partitionNameAfter), true); + Lists.newArrayList(partitionNameAfter), eventTime, true); } } catch (DdlException e) { throw new MetastoreNotificationException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index 85fa3662b6e..706f8cd303d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -154,7 +154,8 @@ public class AlterTableEvent extends MetastoreTableEvent { } //The scope of refresh can be narrowed in the future Env.getCurrentEnv().getCatalogMgr() - .refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); + .refreshExternalTableFromEvent(tableBefore.getDbName(), tableBefore.getTableName(), + catalogName, eventTime, true); } catch (Exception e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java index 738f113f0ed..f71f44cf5ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java @@ -103,7 +103,8 @@ public class DropPartitionEvent extends MetastorePartitionEvent { return; } Env.getCurrentEnv().getCatalogMgr() - .dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true); + .dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), + partitionNames, eventTime, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java index 7436d57c981..1540ca5ea3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java @@ -83,7 +83,8 @@ public class InsertEvent extends MetastoreTableEvent { * the file cache of this table, * but <a href="https://github.com/apache/doris/pull/17932">this PR</a> has fixed it. */ - Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(dbName, tblName, catalogName, true); + Env.getCurrentEnv().getCatalogMgr().refreshExternalTableFromEvent(dbName, tblName, + catalogName, eventTime, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index b4fc963d980..0a6e830316a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -47,10 +47,13 @@ public abstract class MetastoreEvent { protected final String tblName; // eventId of the event. Used instead of calling getter on event everytime - private final long eventId; + protected final long eventId; + + // eventTime of the event. Used instead of calling getter on event everytime + protected final long eventTime; // eventType from the NotificationEvent - private final MetastoreEventType eventType; + protected final MetastoreEventType eventType; // Actual notificationEvent object received from Metastore protected final NotificationEvent metastoreNotificationEvent; @@ -61,6 +64,7 @@ public abstract class MetastoreEvent { protected MetastoreEvent(long eventId, String catalogName, String dbName, String tblName, MetastoreEventType eventType) { this.eventId = eventId; + this.eventTime = -1L; this.catalogName = catalogName; this.dbName = dbName; this.tblName = tblName; @@ -74,6 +78,7 @@ public abstract class MetastoreEvent { this.dbName = event.getDbName(); this.tblName = event.getTableName(); this.eventId = event.getEventId(); + this.eventTime = event.getEventTime() * 1000L; this.eventType = MetastoreEventType.from(event.getEventType()); this.metastoreNotificationEvent = event; this.catalogName = catalogName; @@ -163,8 +168,8 @@ public abstract class MetastoreEvent { */ private Object[] getLogFormatArgs(Object[] args) { Object[] formatArgs = new Object[args.length + 2]; - formatArgs[0] = getEventId(); - formatArgs[1] = getEventType(); + formatArgs[0] = eventId; + formatArgs[1] = eventType; int i = 2; for (Object arg : args) { formatArgs[i] = arg; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 624349f46d6..622d84428fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -140,9 +140,8 @@ public class MetastoreEventsProcessor extends MasterDaemon { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); if (catalog instanceof HMSExternalCatalog) { HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog; - List<NotificationEvent> events = Collections.emptyList(); try { - events = getNextHMSEvents(hmsExternalCatalog); + List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog); if (!events.isEmpty()) { LOG.info("Events size are {} on catalog [{}]", events.size(), hmsExternalCatalog.getName()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java index 7af353f23ac..16de423b28d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -120,6 +120,8 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { Deencapsulation.setField(tbl, "objectCreated", true); Deencapsulation.setField(tbl, "rwLock", new ReentrantReadWriteLock(true)); + Deencapsulation.setField(tbl, "schemaUpdateTime", NOW); + Deencapsulation.setField(tbl, "eventUpdateTime", 0); new Expectations(tbl) { { tbl.getId(); @@ -154,15 +156,16 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { minTimes = 0; result = DLAType.HIVE; - tbl.getUpdateTime(); + // mock initSchemaAndUpdateTime and do nothing + tbl.initSchemaAndUpdateTime(); minTimes = 0; - result = NOW; } }; Deencapsulation.setField(tbl2, "objectCreated", true); Deencapsulation.setField(tbl2, "rwLock", new ReentrantReadWriteLock(true)); - + Deencapsulation.setField(tbl2, "schemaUpdateTime", NOW); + Deencapsulation.setField(tbl2, "eventUpdateTime", 0); new Expectations(tbl2) { { tbl2.getId(); @@ -197,8 +200,8 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { minTimes = 0; result = DLAType.HIVE; - // mock init schema and do nothing - tbl2.initSchema(); + // mock initSchemaAndUpdateTime and do nothing + tbl2.initSchemaAndUpdateTime(); minTimes = 0; } }; @@ -383,7 +386,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { // do nothing } long later = System.currentTimeMillis(); - tbl2.setPartitionUpdateTime(later); + tbl2.setEventUpdateTime(later); // check cache mode again ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); @@ -431,7 +434,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { // do nothing } long later = System.currentTimeMillis(); - tbl2.setPartitionUpdateTime(later); + tbl2.setEventUpdateTime(later); // check cache mode again ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org