This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b98744ae907 [Bug](iceberg)fix read partitioned iceberg without partition path (#25503) b98744ae907 is described below commit b98744ae907bf22cbe496473a81c0bf8d22508fa Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Tue Oct 31 18:09:53 2023 +0800 [Bug](iceberg)fix read partitioned iceberg without partition path (#25503) Iceberg does not require partition values to exist on file paths, so we should get the partition value from `PartitionScanTask.partition`. --- be/src/vec/exec/format/table/iceberg_reader.cpp | 12 +++++++++++- .../org/apache/doris/analysis/SlotDescriptor.java | 3 ++- .../planner/external/iceberg/IcebergScanNode.java | 22 +++++++++++++--------- .../planner/external/iceberg/IcebergSplit.java | 5 +++-- ...est_external_catalog_iceberg_hadoop_catalog.out | 9 +++++++++ ..._external_catalog_iceberg_hadoop_catalog.groovy | 6 ++++++ 6 files changed, 44 insertions(+), 13 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 01333545658..c4bec00f3d6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -562,9 +562,19 @@ void IcebergTableReader::_gen_file_col_names() { auto name = _file_col_names[i]; auto iter = _table_col_to_file_col.find(name); if (iter == _table_col_to_file_col.end()) { - _all_required_col_names.emplace_back(name); + // If the user creates the iceberg table, directly append the parquet file that already exists, + // there is no 'iceberg.schema' field in the footer of parquet, the '_table_col_to_file_col' may be empty. + // Because we are ignoring case, so, it is converted to lowercase here + auto name_low = to_lower(name); + _all_required_col_names.emplace_back(name_low); if (_has_iceberg_schema) { _not_in_file_col_names.emplace_back(name); + } else { + _table_col_to_file_col.emplace(name, name_low); + _file_col_to_table_col.emplace(name_low, name); + if (name != name_low) { + _has_schema_change = true; + } } } else { _all_required_col_names.emplace_back(iter->second); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java index 6384dad8d7b..c5291414b1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java @@ -296,7 +296,8 @@ public class SlotDescriptor { public TSlotDescriptor toThrift() { // Non-nullable slots will have 0 for the byte offset and -1 for the bit mask TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), type.toThrift(), -1, - byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getNonShadowName() : ""), slotIdx, + byteOffset, 0, getIsNullable() ? 0 : -1, + ((column != null) ? column.getNonShadowName() : ""), slotIdx, isMaterialized); tSlotDescriptor.setNeedMaterialize(needMaterialize); tSlotDescriptor.setIsAutoIncrement(isAutoInc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 85a68aa785e..c8a9437e243 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -62,8 +62,8 @@ import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.expressions.Expression; @@ -86,7 +86,6 @@ import java.util.stream.Collectors; public class IcebergScanNode extends FileQueryScanNode { public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; - public static final String DEFAULT_DATA_PATH = "/data/"; private static final String TOTAL_RECORDS = "total-records"; private static final String TOTAL_POSITION_DELETES = "total-position-deletes"; private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; @@ -201,8 +200,6 @@ public class IcebergScanNode extends FileQueryScanNode { // Min split size is DEFAULT_SPLIT_SIZE(128MB). long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); HashSet<String> partitionPathSet = new HashSet<>(); - String dataPath = normalizeLocation(icebergTable.location()) + icebergTable.properties() - .getOrDefault(TableProperties.WRITE_DATA_LOCATION, DEFAULT_DATA_PATH); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); @@ -211,12 +208,18 @@ public class IcebergScanNode extends FileQueryScanNode { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { String dataFilePath = normalizeLocation(splitTask.file().path().toString()); - // Counts the number of partitions read + List<String> partitionValues = new ArrayList<>(); if (isPartitionedTable) { - int last = dataFilePath.lastIndexOf("/"); - if (last > 0) { - partitionPathSet.add(dataFilePath.substring(dataPath.length(), last)); + StructLike structLike = splitTask.file().partition(); + + // set partitionValue for this IcebergSplit + for (int i = 0; i < structLike.size(); i++) { + String partition = String.valueOf(structLike.get(i, Object.class)); + partitionValues.add(partition); } + + // Counts the number of partitions read + partitionPathSet.add(structLike.toString()); } Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties()); @@ -227,7 +230,8 @@ public class IcebergScanNode extends FileQueryScanNode { splitTask.file().fileSizeInBytes(), new String[0], formatVersion, - source.getCatalog().getProperties()); + source.getCatalog().getProperties(), + partitionValues); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index 29deb293b3d..b58514dcf38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -30,8 +30,9 @@ public class IcebergSplit extends FileSplit { // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, - Integer formatVersion, Map<String, String> config) { - super(file, start, length, fileLength, hosts, null); + Integer formatVersion, Map<String, String> config, + List<String> partitionList) { + super(file, start, length, fileLength, hosts, partitionList); this.formatVersion = formatVersion; this.config = config; } diff --git a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out index fa1a58f6f19..aaa9037977d 100644 --- a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out +++ b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out @@ -15,3 +15,12 @@ 1 Customer#000000001 j5JsirBM9P MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING 3 Customer#000000003 fkRGN8n ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE 5 Customer#000000005 hwBtxkoBF qSW4KrI CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD + +-- !q04 -- +1 1970-01-03 09:02:03.000001 a +1 1970-01-03 09:02:03.000001 b +2 1970-01-03 09:02:04.000001 c +2 1970-01-03 09:02:04.000001 d + +-- !q05 -- +463870 diff --git a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy index b35a799b28d..12ec14992ec 100644 --- a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy +++ b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy @@ -36,8 +36,14 @@ suite("test_external_catalog_iceberg_hadoop_catalog", "p2,external,iceberg,exter qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by c_custkey order by c_custkey limit 7 """ qt_q03 """ select * from iceberg_hadoop_catalog order by c_custkey limit 3 """ } + + def q02 = { + qt_q04 """ select * from multi_partition2 order by val """ + qt_q05 """ select count(*) from table_with_append_file where MAN_ID is not null """ + } sql """ use `multi_catalog`; """ q01() + q02() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org