This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b98744ae907 [Bug](iceberg)fix read partitioned iceberg without 
partition path (#25503)
b98744ae907 is described below

commit b98744ae907bf22cbe496473a81c0bf8d22508fa
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Tue Oct 31 18:09:53 2023 +0800

    [Bug](iceberg)fix read partitioned iceberg without partition path (#25503)
    
    Iceberg does not require partition values to exist on file paths, so we 
should get the partition value from `PartitionScanTask.partition`.
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 12 +++++++++++-
 .../org/apache/doris/analysis/SlotDescriptor.java  |  3 ++-
 .../planner/external/iceberg/IcebergScanNode.java  | 22 +++++++++++++---------
 .../planner/external/iceberg/IcebergSplit.java     |  5 +++--
 ...est_external_catalog_iceberg_hadoop_catalog.out |  9 +++++++++
 ..._external_catalog_iceberg_hadoop_catalog.groovy |  6 ++++++
 6 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 01333545658..c4bec00f3d6 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -562,9 +562,19 @@ void IcebergTableReader::_gen_file_col_names() {
         auto name = _file_col_names[i];
         auto iter = _table_col_to_file_col.find(name);
         if (iter == _table_col_to_file_col.end()) {
-            _all_required_col_names.emplace_back(name);
+            // If the user creates the iceberg table, directly append the 
parquet file that already exists,
+            // there is no 'iceberg.schema' field in the footer of parquet, 
the '_table_col_to_file_col' may be empty.
+            // Because we are ignoring case, so, it is converted to lowercase 
here
+            auto name_low = to_lower(name);
+            _all_required_col_names.emplace_back(name_low);
             if (_has_iceberg_schema) {
                 _not_in_file_col_names.emplace_back(name);
+            } else {
+                _table_col_to_file_col.emplace(name, name_low);
+                _file_col_to_table_col.emplace(name_low, name);
+                if (name != name_low) {
+                    _has_schema_change = true;
+                }
             }
         } else {
             _all_required_col_names.emplace_back(iter->second);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 6384dad8d7b..c5291414b1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -296,7 +296,8 @@ public class SlotDescriptor {
     public TSlotDescriptor toThrift() {
         // Non-nullable slots will have 0 for the byte offset and -1 for the 
bit mask
         TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), 
parent.getId().asInt(), type.toThrift(), -1,
-                byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? 
column.getNonShadowName() : ""), slotIdx,
+                byteOffset, 0, getIsNullable() ? 0 : -1,
+                ((column != null) ? column.getNonShadowName() : ""), slotIdx,
                 isMaterialized);
         tSlotDescriptor.setNeedMaterialize(needMaterialize);
         tSlotDescriptor.setIsAutoIncrement(isAutoInc);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 85a68aa785e..c8a9437e243 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -62,8 +62,8 @@ import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.expressions.Expression;
@@ -86,7 +86,6 @@ import java.util.stream.Collectors;
 public class IcebergScanNode extends FileQueryScanNode {
 
     public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
-    public static final String DEFAULT_DATA_PATH = "/data/";
     private static final String TOTAL_RECORDS = "total-records";
     private static final String TOTAL_POSITION_DELETES = 
"total-position-deletes";
     private static final String TOTAL_EQUALITY_DELETES = 
"total-equality-deletes";
@@ -201,8 +200,6 @@ public class IcebergScanNode extends FileQueryScanNode {
         // Min split size is DEFAULT_SPLIT_SIZE(128MB).
         long splitSize = 
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), 
DEFAULT_SPLIT_SIZE);
         HashSet<String> partitionPathSet = new HashSet<>();
-        String dataPath = normalizeLocation(icebergTable.location()) + 
icebergTable.properties()
-                .getOrDefault(TableProperties.WRITE_DATA_LOCATION, 
DEFAULT_DATA_PATH);
         boolean isPartitionedTable = icebergTable.spec().isPartitioned();
 
         CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
@@ -211,12 +208,18 @@ public class IcebergScanNode extends FileQueryScanNode {
             combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
                 String dataFilePath = 
normalizeLocation(splitTask.file().path().toString());
 
-                // Counts the number of partitions read
+                List<String> partitionValues = new ArrayList<>();
                 if (isPartitionedTable) {
-                    int last = dataFilePath.lastIndexOf("/");
-                    if (last > 0) {
-                        
partitionPathSet.add(dataFilePath.substring(dataPath.length(), last));
+                    StructLike structLike = splitTask.file().partition();
+
+                    // set partitionValue for this IcebergSplit
+                    for (int i = 0; i < structLike.size(); i++) {
+                        String partition = String.valueOf(structLike.get(i, 
Object.class));
+                        partitionValues.add(partition);
                     }
+
+                    // Counts the number of partitions read
+                    partitionPathSet.add(structLike.toString());
                 }
 
                 Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
@@ -227,7 +230,8 @@ public class IcebergScanNode extends FileQueryScanNode {
                         splitTask.file().fileSizeInBytes(),
                         new String[0],
                         formatVersion,
-                        source.getCatalog().getProperties());
+                        source.getCatalog().getProperties(),
+                        partitionValues);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 29deb293b3d..b58514dcf38 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -30,8 +30,9 @@ public class IcebergSplit extends FileSplit {
 
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
-                        Integer formatVersion, Map<String, String> config) {
-        super(file, start, length, fileLength, hosts, null);
+                        Integer formatVersion, Map<String, String> config,
+                        List<String> partitionList) {
+        super(file, start, length, fileLength, hosts, partitionList);
         this.formatVersion = formatVersion;
         this.config = config;
     }
diff --git 
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
 
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
index fa1a58f6f19..aaa9037977d 100644
--- 
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
+++ 
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
@@ -15,3 +15,12 @@
 1      Customer#000000001      j5JsirBM9P      MOROCCO  0      MOROCCO AFRICA  
25-989-741-2988 BUILDING
 3      Customer#000000003      fkRGN8n ARGENTINA7      ARGENTINA       AMERICA 
11-719-748-3364 AUTOMOBILE
 5      Customer#000000005      hwBtxkoBF qSW4KrI       CANADA   5      CANADA  
AMERICA 13-750-942-6364 HOUSEHOLD
+
+-- !q04 --
+1      1970-01-03 09:02:03.000001      a
+1      1970-01-03 09:02:03.000001      b
+2      1970-01-03 09:02:04.000001      c
+2      1970-01-03 09:02:04.000001      d
+
+-- !q05 --
+463870
diff --git 
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
 
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
index b35a799b28d..12ec14992ec 100644
--- 
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
+++ 
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
@@ -36,8 +36,14 @@ suite("test_external_catalog_iceberg_hadoop_catalog", 
"p2,external,iceberg,exter
             qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by 
c_custkey order by c_custkey limit 7 """
             qt_q03 """ select * from iceberg_hadoop_catalog order by c_custkey 
limit 3 """
         }
+        
+        def q02 = {
+            qt_q04 """ select * from multi_partition2 order by val """
+            qt_q05 """ select count(*) from table_with_append_file where 
MAN_ID is not null """
+        }
 
         sql """ use `multi_catalog`; """
         q01()
+        q02()
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to