This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 72f074539c4bb2fe663d748f97cd62df88337f27 Author: Tamas Mate <tma...@apache.org> AuthorDate: Wed Mar 23 09:44:05 2022 +0100 IMPALA-11116: Make DESCRIBE HISTORY parameterized The numer of snapshots can grow large, because every INSERT can create a new one. This patch adds inclusive predicates to narrow down the resultset of the DESCRIBE HISTORY statement, these are: - DESCRIBE HISTORY <table> FROM <ts> - DESCRIBE HISTORY <table> BETWEEN <ts> AND <ts> The timestamps can be date time values and intervals as well, such as: - '2022-02-04 13:31:09.819' - 'now() - interval 2 days' Testing: - Added e2e tests that verifies the result. - Added unit tests that checks the analysis. Change-Id: Ifead0d33f22069005bfd623460f4af1ff197cc0e Reviewed-on: http://gerrit.cloudera.org:8080/18284 Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Reviewed-by: Zoltan Borok-Nagy <borokna...@cloudera.com> --- common/thrift/Frontend.thrift | 3 + docs/topics/impala_iceberg.xml | 14 ++- fe/src/main/cup/sql-parser.cup | 10 ++ .../impala/analysis/DescribeHistoryStmt.java | 130 ++++++++++++++++++++- .../java/org/apache/impala/service/Frontend.java | 41 ++++--- .../org/apache/impala/service/JniFrontend.java | 4 +- .../apache/impala/analysis/AnalyzeStmtsTest.java | 29 +++++ tests/query_test/test_iceberg.py | 81 +++++++++++++ 8 files changed, 287 insertions(+), 25 deletions(-) diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index 523273e..0afb074 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -226,6 +226,9 @@ struct TShowStatsParams { // Parameters for DESCRIBE HISTORY command struct TDescribeHistoryParams { 1: CatalogObjects.TTableName table_name + 2: optional i64 between_start_time + 3: optional i64 between_end_time + 4: optional i64 from_time } // Parameters for SHOW FUNCTIONS commands diff --git a/docs/topics/impala_iceberg.xml b/docs/topics/impala_iceberg.xml index ba33ae3..3845038 100644 --- a/docs/topics/impala_iceberg.xml +++ b/docs/topics/impala_iceberg.xml @@ -460,10 +460,20 @@ SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456; <p> If one needs to check the available snapshots of a table they can use the <codeph>DESCRIBE HISTORY</codeph> - statement: + statement with the following syntax: <codeblock> -DESCRIBE HISTORY ice_t; +DESCRIBE HISTORY [<varname>db_name</varname>.]<varname>table_name</varname> + [FROM <varname>timestamp</varname>]; + +DESCRIBE HISTORY [<varname>db_name</varname>.]<varname>table_name</varname> + [BETWEEN <varname>timestamp</varname> AND <varname>timestamp</varname>] </codeblock> + For example: +<codeblock> +DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00'; +DESCRIBE HISTORY ice_t FROM now() - interval 5 days; +DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00'; +</codeblock> </p> <p> diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index bbd1e53..6e16c29 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -2858,6 +2858,16 @@ describe_stmt ::= parser.checkIdentKeyword("HISTORY", history_id); RESULT = new DescribeHistoryStmt(table); :} + | KW_DESCRIBE IDENT:history_id table_name:table KW_FROM expr:e + {: + parser.checkIdentKeyword("HISTORY", history_id); + RESULT = new DescribeHistoryStmt(table, e); + :} + | KW_DESCRIBE IDENT:history_id table_name:table KW_BETWEEN expr:e1 KW_AND expr:e2 + {: + parser.checkIdentKeyword("HISTORY", history_id); + RESULT = new DescribeHistoryStmt(table, e1, e2); + :} ; describe_output_style ::= diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeHistoryStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeHistoryStmt.java index d53fed8..1ada798 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DescribeHistoryStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DescribeHistoryStmt.java @@ -22,27 +22,76 @@ import java.util.List; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.InternalException; import org.apache.impala.thrift.TDescribeHistoryParams; +import org.apache.impala.util.ExprUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; /** - * Representation of a DESCRIBE HISTORY statement. + * Representation of a DESCRIBE HISTORY statement, returns the available snapshot based + * on the predicate. + * Syntax: DESCRIBE HISTORY <table>; + * DESCRIBE HISTORY <table> BETWEEN <ts1> AND <ts2>; + * DESCRIBE HISTORY <table> FROM <ts>; + * DESCRIBE HISTORY <table> FROM now() - interval 1 days; */ public class DescribeHistoryStmt extends StatementBase { + private final static Logger LOG = LoggerFactory.getLogger(TimeTravelSpec.class); + + // Represents the predicate with which this statement was called. + public enum Kind { + FROM, + BETWEEN, + NO_PREDICATE + } + + // Table name, result of parsing. protected final TableName tableName_; + // Expression used during: DESCRIBE HISRTORY <table> FROM <from_> + protected Expr from_; + + // Expressions used during: + // DESCRIBE HISRTORY <table> BETWEEN <between_start_time_> AND <between_end_time_> + protected Expr betweenStartTime_; + protected Expr betweenEndTime_; + + // Store the Kind of this statement, it is used during analysis. + final private Kind kind_; + // Set during analysis. protected FeTable table_; + // from_ expression after analyzis in milliseconds. + long fromMillis_; + + // betweenStartTime_ expression after analyzis in milliseconds. + long betweenStartTimeMillis_; + + // betweenEndTimeMilis_ expression after analyzis in milliseconds. + long betweenEndTimeMillis_; + public DescribeHistoryStmt(TableName tableName) { tableName_ = Preconditions.checkNotNull(tableName); + kind_ = Kind.NO_PREDICATE; } - @Override - public String toSql(ToSqlOptions options) { - return "DESCRIBE HISTORY " + tableName_.toString(); + public DescribeHistoryStmt(TableName tableName, Expr from) { + tableName_ = Preconditions.checkNotNull(tableName); + from_ = Preconditions.checkNotNull(from); + kind_ = Kind.FROM; + } + + public DescribeHistoryStmt(TableName tableName, Expr between1, Expr between2) { + tableName_ = Preconditions.checkNotNull(tableName); + betweenStartTime_ = Preconditions.checkNotNull(between1); + betweenEndTime_ = Preconditions.checkNotNull(between2); + kind_ = Kind.BETWEEN; } @Override @@ -58,10 +107,79 @@ public class DescribeHistoryStmt extends StatementBase { throw new AnalysisException(String.format( "DESCRIBE HISTORY must specify an Iceberg table: %s", table_.getFullName())); } + switch (kind_) { + case FROM: + fromMillis_ = analyzeExpr(analyzer, from_); + break; + case BETWEEN: + betweenStartTimeMillis_ = analyzeExpr(analyzer, betweenStartTime_); + betweenEndTimeMillis_ = analyzeExpr(analyzer, betweenEndTime_); + break; + case NO_PREDICATE: + default: + break; + } + } + + /** + * Analyzes the provided expression then verfies if it is possible to obtain a + * timestamp from it. Returns the timestamp in Unix time milliseconds. + */ + private long analyzeExpr(Analyzer analyzer, Expr expr) throws AnalysisException { + try { + expr.analyze(analyzer); + } catch (AnalysisException e) { + throw new AnalysisException("Unsupported expression: '" + expr.toSql() + "'"); + } + if (expr.getType().isStringType()) { + expr = new CastExpr(Type.TIMESTAMP, expr); + } + if (!expr.getType().isTimestamp()) { + throw new AnalysisException(kind_.toString() + + " <expression> must be a timestamp type but is '" + + expr.getType() + "': " + expr.toSql()); + } + LOG.debug(kind_.toString() + " <expression>: " + String.valueOf(expr)); + long micros = 0; + try { + micros = ExprUtil.localTimestampToUnixTimeMicros(analyzer, expr); + } catch (InternalException ie) { + throw new AnalysisException( + "Invalid TIMESTAMP expression: " + ie.getMessage(), ie); + } + return micros / 1000; + } + + @Override + public String toSql(ToSqlOptions options) { + switch(kind_) { + case FROM: + return "DESCRIBE HISTORY " + tableName_.toString() + " FROM " + from_.toSql(); + case BETWEEN: + return "DESCRIBE HISTORY " + tableName_.toString() + " BETWEEN " + + betweenStartTime_.toSql() + " AND " + betweenEndTime_.toSql(); + case NO_PREDICATE: + default: + return "DESCRIBE HISTORY " + tableName_.toString(); + } } public TDescribeHistoryParams toThrift() { - return new TDescribeHistoryParams(new TableName(table_.getDb().getName(), - table_.getName()).toThrift()); + TDescribeHistoryParams describeHistoryParams = new TDescribeHistoryParams(); + TableName tableName = new TableName(table_.getDb().getName(), table_.getName()); + describeHistoryParams.setTable_name(tableName.toThrift()); + switch(kind_) { + case FROM: + describeHistoryParams.setFrom_time(fromMillis_); + break; + case BETWEEN: + describeHistoryParams.setBetween_start_time(betweenStartTimeMillis_); + describeHistoryParams.setBetween_end_time(betweenEndTimeMillis_); + break; + case NO_PREDICATE: + default: + break; + } + return describeHistoryParams; } } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index ced9574..8a75fd6 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -100,7 +101,6 @@ import org.apache.impala.catalog.FeCatalogUtils; import org.apache.impala.catalog.FeDataSource; import org.apache.impala.catalog.FeDataSourceTable; import org.apache.impala.catalog.FeDb; -import org.apache.impala.catalog.FeFsPartition; import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeHBaseTable; import org.apache.impala.catalog.FeIcebergTable; @@ -110,12 +110,10 @@ import org.apache.impala.catalog.Function; import org.apache.impala.catalog.ImpaladCatalog; import org.apache.impala.catalog.ImpaladTableUsageTracker; import org.apache.impala.catalog.MetaStoreClientPool; -import org.apache.impala.catalog.PrunablePartition; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.local.InconsistentMetadataFetchException; -import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; @@ -148,6 +146,7 @@ import org.apache.impala.thrift.TCreateDropRoleParams; import org.apache.impala.thrift.TDataSink; import org.apache.impala.thrift.TDdlExecRequest; import org.apache.impala.thrift.TDdlType; +import org.apache.impala.thrift.TDescribeHistoryParams; import org.apache.impala.thrift.TDescribeOutputStyle; import org.apache.impala.thrift.TDescribeResult; import org.apache.impala.thrift.TExecRequest; @@ -186,10 +185,8 @@ import org.apache.impala.thrift.TTruncateParams; import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateCatalogCacheRequest; import org.apache.impala.thrift.TUpdateCatalogCacheResponse; -import org.apache.impala.thrift.TUpdateExecutorMembershipRequest; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.EventSequence; -import org.apache.impala.util.ExecutorMembershipSnapshot; import org.apache.impala.util.IcebergUtil; import org.apache.impala.util.KuduUtil; import org.apache.impala.util.PatternMatcher; @@ -1224,18 +1221,33 @@ public class Frontend { /** * Handles DESCRIBE HISTORY queries. */ - public TGetTableHistoryResult getTableHistory(String dbName, String tableName) + public TGetTableHistoryResult getTableHistory(TDescribeHistoryParams params) throws DatabaseNotFoundException, TableLoadingException { - FeTable feTable = getCatalog().getTable(dbName, tableName); - Preconditions.checkState(feTable instanceof FeIcebergTable); + FeTable feTable = getCatalog().getTable(params.getTable_name().db_name, + params.getTable_name().table_name); FeIcebergTable feIcebergTable = (FeIcebergTable) feTable; TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(feIcebergTable); Table table = IcebergUtil.loadTable(feIcebergTable); Set<Long> ancestorIds = Sets.newHashSet(IcebergUtil.currentAncestorIds(table)); - - TGetTableHistoryResult result = new TGetTableHistoryResult(); - result.result = Lists.newArrayList(); - for (HistoryEntry historyEntry : metadata.snapshotLog()) { + TGetTableHistoryResult historyResult = new TGetTableHistoryResult(); + + List<HistoryEntry> filteredHistoryEntries = + metadata.snapshotLog().stream().collect(Collectors.toList()); + if (params.isSetFrom_time()) { + // DESCRIBE HISTORY <table> FROM <ts> + filteredHistoryEntries = metadata.snapshotLog().stream() + .filter(c -> c.timestampMillis() >= params.from_time) + .collect(Collectors.toList()); + } else if (params.isSetBetween_start_time() && params.isSetBetween_end_time()) { + // DESCRIBE HISTORY <table> BETWEEN <ts> AND <ts> + filteredHistoryEntries = metadata.snapshotLog().stream() + .filter(x -> x.timestampMillis() >= params.between_start_time && + x.timestampMillis() <= params.between_end_time) + .collect(Collectors.toList()); + } + + List<TGetTableHistoryResultItem> result = Lists.newArrayList(); + for (HistoryEntry historyEntry : filteredHistoryEntries) { TGetTableHistoryResultItem resultItem = new TGetTableHistoryResultItem(); long snapshotId = historyEntry.snapshotId(); resultItem.setCreation_time(historyEntry.timestampMillis()); @@ -1245,9 +1257,10 @@ public class Frontend { resultItem.setParent_id(snapshot.parentId()); } resultItem.setIs_current_ancestor(ancestorIds.contains(snapshotId)); - result.result.add(resultItem); + result.add(resultItem); } - return result; + historyResult.setResult(result); + return historyResult; } /** diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 663673e..b805c20 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -336,9 +336,7 @@ public class JniFrontend { Preconditions.checkNotNull(frontend_); TDescribeHistoryParams params = new TDescribeHistoryParams(); JniUtil.deserializeThrift(protocolFactory_, params, thriftParams); - TGetTableHistoryResult result = frontend_.getTableHistory( - params.getTable_name().getDb_name(), params.getTable_name().getTable_name()); - + TGetTableHistoryResult result = frontend_.getTableHistory(params); TSerializer serializer = new TSerializer(protocolFactory_); try { return serializer.serialize(result); diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index 2206cd8..2a585c3 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -4944,6 +4944,35 @@ public class AnalyzeStmtsTest extends AnalyzerTest { } @Test + public void testIcebergDescribeHistory() throws ImpalaException { + TableName iceT = new TableName("functional_parquet", "iceberg_non_partitioned"); + TableName nonIceT = new TableName("functional", "allcomplextypes"); + + // Analyze without predicate. + TblsAnalyzeOk("DESCRIBE HISTORY $TBL", iceT); + + // Analyze with predicates. + TblsAnalyzeOk("DESCRIBE HISTORY $TBL FROM \"2022-02-14 13:31:09.819\"", iceT); + TblsAnalyzeOk("DESCRIBE HISTORY $TBL FROM " + + "cast('2021-08-09 15:52:45' as timestamp) - interval 2 days + interval 3 days", + iceT); + TblsAnalyzeOk("DESCRIBE HISTORY $TBL FROM now() + interval 3 days", iceT); + TblsAnalyzeOk("DESCRIBE HISTORY $TBL BETWEEN '2021-02-22' AND '2021-02-22'", iceT); + + // Analyze should fail with unsupported expression types. + TblsAnalysisError("DESCRIBE HISTORY $TBL FROM 42 ", iceT, + "FROM <expression> must be a timestamp type"); + TblsAnalysisError("DESCRIBE HISTORY $TBL FROM id", iceT, + "Unsupported expression: 'id'"); + TblsAnalysisError("DESCRIBE HISTORY $TBL FROM '2021-02-32 15:52:45'", iceT, + "Invalid TIMESTAMP expression"); + + // DESCRIBE HISTORY is only supported for Iceberg tables. + TblsAnalysisError("DESCRIBE HISTORY $TBL", nonIceT, + "DESCRIBE HISTORY must specify an Iceberg table:"); + } + + @Test public void testCreatePartitionedIcebergTable() throws ImpalaException { String tblProperties = " TBLPROPERTIES ('iceberg.catalog'='hadoop.tables')"; AnalyzesOk("CREATE TABLE tbl1 (i int, p1 int, p2 timestamp) " + diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 7444ff9..48f19f5 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -124,6 +124,87 @@ class TestIcebergTable(ImpalaTestSuite): # Check "is_current_ancestor" column. assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE") + def test_describe_history_params(self, vector, unique_database): + tbl_name = unique_database + ".describe_history" + time_format = '%Y-%m-%d %H:%M:%S.%f' + + def execute_query_ts(query): + impalad_client.execute(query) + return datetime.datetime.now() + + def expect_results_from(ts, expected_result_size): + query = "DESCRIBE HISTORY {0} FROM {1};".format(tbl_name, cast_ts(ts)) + data = impalad_client.execute(query) + assert len(data.data) == expected_result_size + for i in range(len(data.data)): + result_ts = data.data[i].split('\t')[0][:- 3] + result_ts_dt = datetime.datetime.strptime(result_ts, time_format) + assert result_ts_dt > ts + + def expect_results_between(ts_start, ts_end, expected_result_size): + query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format( + tbl_name, cast_ts(ts_start), cast_ts(ts_end)) + data = impalad_client.execute(query) + assert len(data.data) == expected_result_size + for i in range(len(data.data)): + result_ts = data.data[i].split('\t')[0][:- 3] + result_ts_dt = datetime.datetime.strptime(result_ts, time_format) + assert result_ts_dt > ts_start and result_ts_dt < ts_end + + def quote(s): + return "'{0}'".format(s) + + def cast_ts(ts): + return "CAST({0} as timestamp)".format(quote(ts)) + + def impala_now(): + now_data = impalad_client.execute("select now()") + now_data_ts = now_data.data[0][:- 3] + now_data_ts_dt = datetime.datetime.strptime(now_data_ts, time_format) + return now_data_ts_dt + + # We are setting the TIMEZONE query option in this test, so let's create a local + # impala client. + with self.create_impala_client() as impalad_client: + # Iceberg doesn't create a snapshot entry for the initial empty table + impalad_client.execute("create table {0} (i int) stored as iceberg" + .format(tbl_name)) + ts_1 = execute_query_ts("insert into {0} values (1)".format(tbl_name)) + time.sleep(5) + ts_2 = execute_query_ts("insert into {0} values (2)".format(tbl_name)) + time.sleep(5) + ts_3 = execute_query_ts("insert into {0} values (3)".format(tbl_name)) + # Describe history without predicate + data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name)) + assert len(data.data) == 3 + + # Describe history with FROM predicate + expect_results_from(ts_1 - datetime.timedelta(hours=1), 3) + expect_results_from(ts_1, 2) + expect_results_from(ts_3, 0) + + # Describe history with BETWEEN <ts> AND <ts> predicate + expect_results_between(ts_1, ts_2, 1) + expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2, 2) + expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2 + + datetime.timedelta(hours=1), 3) + + # Check that timezone is interpreted in local timezone controlled by query option + # TIMEZONE. Persist the local times first and create a new snapshot. + impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") + now_tokyo = impala_now() + impalad_client.execute("SET TIMEZONE='Europe/Budapest'") + now_budapest = impala_now() + execute_query_ts("insert into {0} values (4)".format(tbl_name)) + expect_results_from(now_budapest, 1) + + # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time. + impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") + expect_results_from(now_tokyo, 1) + + # Interpreting Budapest time in Tokyo time points to the past. + expect_results_from(now_budapest, 4) + def test_time_travel(self, vector, unique_database): tbl_name = unique_database + ".time_travel"