Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-15 Thread via GitHub


Jibing-Li merged PR #44726:
URL: https://github.com/apache/doris/pull/44726


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-15 Thread via GitHub


github-actions[bot] commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2544520651

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-15 Thread via GitHub


Jibing-Li commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2544500704

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-15 Thread via GitHub


Jibing-Li commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1886117447


##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -51,9 +108,63 @@ protected synchronized void makeSureInitialized() {
 }
 }
 
+@VisibleForTesting
+public void setTable(Table table) {
+this.table = table;
+}
+
+@VisibleForTesting
+public void setPartitionColumns(List partitionColumns) {
+this.partitionColumns = partitionColumns;
+}
+
+@VisibleForTesting
+public List getSchema() {
+Schema schema = table.schema();
+List columns = schema.columns();
+List tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+for (Types.NestedField field : columns) {
+tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),

Review Comment:
   The code was used for unit test, it is useless now. I removed the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-15 Thread via GitHub


Jibing-Li commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1886117113


##
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##
@@ -176,6 +177,11 @@ public void run() throws JobException {
 this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
 beforeMTMVRefresh();
 if (mtmv.getMvPartitionInfo().getPartitionType() != 
MTMVPartitionType.SELF_MANAGE) {
+MTMVRelatedTableIf relatedTable = 
mtmv.getMvPartitionInfo().getRelatedTable();
+if (!relatedTable.isValidRelatedTable()) {
+throw new JobException("MTMV " + mtmv.getName() + "'s 
related table " + relatedTable.getName()
++ " is not a valid related table anymore, stop 
refreshing.");

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-13 Thread via GitHub


morrySnow commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1879883501


##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -51,9 +108,63 @@ protected synchronized void makeSureInitialized() {
 }
 }
 
+@VisibleForTesting
+public void setTable(Table table) {
+this.table = table;
+}
+
+@VisibleForTesting
+public void setPartitionColumns(List partitionColumns) {
+this.partitionColumns = partitionColumns;
+}
+
+@VisibleForTesting
+public List getSchema() {
+Schema schema = table.schema();
+List columns = schema.columns();
+List tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+for (Types.NestedField field : columns) {
+tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),

Review Comment:
   why to lowercase?



##
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##
@@ -176,6 +177,11 @@ public void run() throws JobException {
 this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
 beforeMTMVRefresh();
 if (mtmv.getMvPartitionInfo().getPartitionType() != 
MTMVPartitionType.SELF_MANAGE) {
+MTMVRelatedTableIf relatedTable = 
mtmv.getMvPartitionInfo().getRelatedTable();
+if (!relatedTable.isValidRelatedTable()) {
+throw new JobException("MTMV " + mtmv.getName() + "'s 
related table " + relatedTable.getName()
++ " is not a valid related table anymore, stop 
refreshing.");

Review Comment:
   could we also print invalid reason here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-10 Thread via GitHub


Jibing-Li commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2531684212

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-10 Thread via GitHub


Jibing-Li commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2530794503

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-10 Thread via GitHub


github-actions[bot] commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2530806461

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-09 Thread via GitHub


Jibing-Li commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1877456441


##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -90,4 +199,318 @@ public long fetchRowCount() {
 public Table getIcebergTable() {
 return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
 }
+
+@Override
+public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+Env.getCurrentEnv().getRefreshManager()
+.refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+}
+
+@Override
+public Map 
getAndCopyPartitionItems(Optional snapshot) {
+return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+}
+
+private IcebergPartitionInfo getPartitionInfoFromCache() {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+if (!schemaCacheValue.isPresent()) {
+return new IcebergPartitionInfo();
+}
+return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+}
+
+@Override
+public PartitionType getPartitionType(Optional snapshot) {
+makeSureInitialized();
+return isSupportedRelatedTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+}
+
+@Override
+public Set getPartitionColumnNames(Optional 
snapshot) throws DdlException {
+return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+}
+
+@Override
+public List getPartitionColumns(Optional snapshot) {
+return getPartitionColumnsFromCache();
+}
+
+private List getPartitionColumnsFromCache() {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+return schemaCacheValue
+.map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+.orElseGet(Lists::newArrayList);
+}
+
+@Override
+public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+   Optional 
snapshot) throws AnalysisException {
+long latestSnapshotId = 
getPartitionInfoFromCache().getLatestSnapshotId(partitionName);
+if (latestSnapshotId <= 0) {
+throw new AnalysisException("can not find partition: " + 
partitionName);
+}
+return new MTMVVersionSnapshot(latestSnapshotId);
+}
+
+@Override
+public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional snapshot)
+throws AnalysisException {
+return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+}
+
+public long getLatestSnapshotIdFromCache() throws AnalysisException {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+if (!schemaCacheValue.isPresent()) {
+throw new AnalysisException("Can't find schema cache of table " + 
name);
+}
+return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+}
+
+@Override
+public boolean isPartitionColumnAllowNull() {
+if (partitionColumns == null || partitionColumns.size() != 1) {
+return false;
+}
+return partitionColumns.get(0).isAllowNull();
+}
+
+@Override
+public boolean isSupportedRelatedTable() {
+Set allFields = Sets.newHashSet();

Review Comment:
   Will add cache for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-09 Thread via GitHub


Jibing-Li commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1877446230


##
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java:
##
@@ -115,4 +115,11 @@ default boolean needAutoRefresh() {
  * @return
  */
 boolean isPartitionColumnAllowNull();
+
+/**
+ * If the table is supported as related table.
+ * For example, an Iceberg table may become unsupported after partition 
revolution.
+ * @return
+ */
+boolean isSupportedRelatedTable();

Review Comment:
   Renamed to isValidRelatedTable. This function is used to check a table's 
partition is still supported. For example, if the partition column changed from 
col1 to col2, this function will return false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-09 Thread via GitHub


Jibing-Li commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1877429504


##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java:
##
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.PartitionItem;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.Set;
+
+public class IcebergPartitionInfo {
+private final Map nameToPartitionItem;
+private final Map nameToIcebergPartition;
+private final Map> nameToIcebergPartitionNames;
+
+public IcebergPartitionInfo() {
+this.nameToPartitionItem = Maps.newHashMap();
+this.nameToIcebergPartition = Maps.newHashMap();
+this.nameToIcebergPartitionNames = Maps.newHashMap();
+}
+
+public IcebergPartitionInfo(Map nameToPartitionItem,
+   Map 
nameToIcebergPartition,
+Map> 
nameToIcebergPartitionNames) {
+this.nameToPartitionItem = nameToPartitionItem;
+this.nameToIcebergPartition = nameToIcebergPartition;
+this.nameToIcebergPartitionNames = nameToIcebergPartitionNames;
+}
+
+public Map getNameToPartitionItem() {
+return nameToPartitionItem;
+}
+
+public Map getNameToIcebergPartition() {
+return nameToIcebergPartition;
+}
+
+public long getLatestSnapshotId(String partitionName) {
+Set icebergPartitionNames = 
nameToIcebergPartitionNames.get(partitionName);
+if (icebergPartitionNames == null) {

Review Comment:
   Here is an example:
   Iceberg table has 3 partitions, their name to ranges are like:
   p1 -> [0, 10)
   p2 -> [10, 30)
   p3 -> [20, 30)
   
   In Doris, p2 and p3 should be merged to one partition p2 -> [10, 30)。So 
doris has the 2 partitions:
   p1 -> [0, 10)
   p2 -> [10, 30)
   
   In this case, we need a map nameToIcebergPartitionNames to record that p2 is 
actually mapped to p2 and p3 in Iceberg:
   nameToIcebergPartitionNames : p2 -> {p2, p3}
   The partition p1 in doris is mapped to p1 in Iceberg (1 to 1 map), so we 
don't keep any mapping in nameToIcebergPartitionNames for p1. 
   So when we try to get snapshot id for p1, 
nameToIcebergPartitionNames.get("p1") will return null. It means p1 in doris is 
mapped only to p1 in Iceberg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-09 Thread via GitHub


Jibing-Li commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1877429504


##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java:
##
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.PartitionItem;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.Set;
+
+public class IcebergPartitionInfo {
+private final Map nameToPartitionItem;
+private final Map nameToIcebergPartition;
+private final Map> nameToIcebergPartitionNames;
+
+public IcebergPartitionInfo() {
+this.nameToPartitionItem = Maps.newHashMap();
+this.nameToIcebergPartition = Maps.newHashMap();
+this.nameToIcebergPartitionNames = Maps.newHashMap();
+}
+
+public IcebergPartitionInfo(Map nameToPartitionItem,
+   Map 
nameToIcebergPartition,
+Map> 
nameToIcebergPartitionNames) {
+this.nameToPartitionItem = nameToPartitionItem;
+this.nameToIcebergPartition = nameToIcebergPartition;
+this.nameToIcebergPartitionNames = nameToIcebergPartitionNames;
+}
+
+public Map getNameToPartitionItem() {
+return nameToPartitionItem;
+}
+
+public Map getNameToIcebergPartition() {
+return nameToIcebergPartition;
+}
+
+public long getLatestSnapshotId(String partitionName) {
+Set icebergPartitionNames = 
nameToIcebergPartitionNames.get(partitionName);
+if (icebergPartitionNames == null) {

Review Comment:
   Here is an example:
   Iceberg table has 3 partitions, their name to ranges are like:
   p1 -> [0, 10)
   p2 -> [10, 30)
   p3 -> [20, 30)
   
   In Doris, p2 and p3 should be merged to one partition p2 -> [10, 30)。So 
doris has the 2 partitions:
   p1 -> [0, 10)
   p2 -> [10, 30)
   
   In this case, we need a map nameToIcebergPartitionNames to record that p2 is 
actually mapped to p2 and p3 in Iceberg:
   nameToIcebergPartitionNames : p2 -> {p2, p3}
   The partition p1 in doris is mapped to p1 in Iceberg (1 to 1 map), so we 
don't keep any mapping in nameToIcebergPartitionNames for p1. So when we try to 
get snapshot id for p1, nameToIcebergPartitionNames.get("p1") will return null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-09 Thread via GitHub


zddr commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1875797728


##
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java:
##
@@ -115,4 +115,11 @@ default boolean needAutoRefresh() {
  * @return
  */
 boolean isPartitionColumnAllowNull();
+
+/**
+ * If the table is supported as related table.
+ * For example, an Iceberg table may become unsupported after partition 
revolution.
+ * @return
+ */
+boolean isSupportedRelatedTable();

Review Comment:
   not need this method?If some scenarios are not supported, simply return 
Unpartitioned in the getPartitionType method



##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java:
##
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.PartitionItem;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.Set;
+
+public class IcebergPartitionInfo {
+private final Map nameToPartitionItem;
+private final Map nameToIcebergPartition;
+private final Map> nameToIcebergPartitionNames;
+
+public IcebergPartitionInfo() {
+this.nameToPartitionItem = Maps.newHashMap();
+this.nameToIcebergPartition = Maps.newHashMap();
+this.nameToIcebergPartitionNames = Maps.newHashMap();
+}
+
+public IcebergPartitionInfo(Map nameToPartitionItem,
+   Map 
nameToIcebergPartition,
+Map> 
nameToIcebergPartitionNames) {
+this.nameToPartitionItem = nameToPartitionItem;
+this.nameToIcebergPartition = nameToIcebergPartition;
+this.nameToIcebergPartitionNames = nameToIcebergPartitionNames;
+}
+
+public Map getNameToPartitionItem() {
+return nameToPartitionItem;
+}
+
+public Map getNameToIcebergPartition() {
+return nameToIcebergPartition;
+}
+
+public long getLatestSnapshotId(String partitionName) {
+Set icebergPartitionNames = 
nameToIcebergPartitionNames.get(partitionName);
+if (icebergPartitionNames == null) {

Review Comment:
   What situation would be null



##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -90,4 +199,318 @@ public long fetchRowCount() {
 public Table getIcebergTable() {
 return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
 }
+
+@Override
+public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+Env.getCurrentEnv().getRefreshManager()
+.refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+}
+
+@Override
+public Map 
getAndCopyPartitionItems(Optional snapshot) {
+return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+}
+
+private IcebergPartitionInfo getPartitionInfoFromCache() {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+if (!schemaCacheValue.isPresent()) {
+return new IcebergPartitionInfo();
+}
+return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+}
+
+@Override
+public PartitionType getPartitionType(Optional snapshot) {
+makeSureInitialized();
+return isSupportedRelatedTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+}
+
+@Override
+public Set getPartitionColumnNames(Optional 
snapshot) throws DdlException {
+return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+}
+
+@Override
+public List getPartitionColumns(Optional snapshot) {
+return getPartitionColumnsFromCache();
+}
+
+private List getPartitionColumnsFromCache() {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+return schemaCacheValue
+.map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+.orElseGet(Lists::newArrayList);
+}
+
+@O

Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-05 Thread via GitHub


Jibing-Li commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2522236034

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-12-05 Thread via GitHub


Jibing-Li commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1870952167


##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -90,4 +188,200 @@ public long fetchRowCount() {
 public Table getIcebergTable() {
 return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
 }
+
+@Override
+public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+Env.getCurrentEnv().getRefreshManager()
+.refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+}
+
+@Override
+public Map 
getAndCopyPartitionItems(Optional snapshot) {
+return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+}
+
+private IcebergPartitionInfo getPartitionInfoFromCache() {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+if (!schemaCacheValue.isPresent()) {
+return new IcebergPartitionInfo();
+}
+return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+}
+
+@Override
+public PartitionType getPartitionType(Optional snapshot) {
+return isSupportedPartitionTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+}
+
+@Override
+public Set getPartitionColumnNames(Optional 
snapshot) throws DdlException {
+return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+}
+
+@Override
+public List getPartitionColumns(Optional snapshot) {
+return getPartitionColumnsFromCache();
+}
+
+private List getPartitionColumnsFromCache() {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+return schemaCacheValue
+.map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+.orElseGet(Lists::newArrayList);
+}
+
+@Override
+public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+   Optional 
snapshot) throws AnalysisException {
+IcebergPartition icebergPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+if (icebergPartition == null) {
+throw new AnalysisException("can not find partition: " + 
partitionName);
+}
+return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId());
+}
+
+@Override
+public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional snapshot)
+throws AnalysisException {
+return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+}
+
+public long getLatestSnapshotIdFromCache() throws AnalysisException {
+makeSureInitialized();
+Optional schemaCacheValue = getSchemaCacheValue();
+if (!schemaCacheValue.isPresent()) {
+throw new AnalysisException("not present");
+}
+return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+}
+
+@Override
+public boolean isPartitionColumnAllowNull() {
+return true;
+}
+
+public boolean isSupportedPartitionTable() {
+// TODO: Support IDENTITY transform.
+PartitionSpec spec = table.spec();
+if (spec == null) {
+return false;
+}
+if (spec.fields().size() != 1) {
+return false;
+}
+String transformName = spec.fields().get(0).transform().toString();

Review Comment:
   The class Year is protected, so we can't call instanceof Year in our code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-11-28 Thread via GitHub


zddr commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1862888274


##
regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy:
##
@@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", 
"p0,external,iceberg,external_docker,external_docker_
 sql """drop materialized view if exists ${mvName};"""
 sql """ drop catalog if exists ${catalog_name} """
 }
+
+// Test partition refresh.
+// Use hms catalog to avoid rest catalog fail to write caused by sqlite 
database file locked.
+if (enabled != null && enabled.equalsIgnoreCase("true")) {
+String hivePrefix = "hive2";
+String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+String hdfs_port = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
+String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+String warehouse = "${default_fs}/warehouse"
+
+String catalog_name = "iceberg_mtmv_catalog_hms";
+String mvName1 = "test_iceberg_mtmv_ts"
+String mvName2 = "test_iceberg_mtmv_d"
+String dbName = "regression_test_mtmv_partition_p0"
+String icebergDb = "iceberg_mtmv_partition"
+String icebergTable1 = "tstable"
+String icebergTable2 = "dtable"
+sql """drop catalog if exists ${catalog_name} """
+sql """create catalog if not exists ${catalog_name} properties (
+'type'='iceberg',
+'iceberg.catalog.type'='hms',
+'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
+'fs.defaultFS' = '${default_fs}',
+'warehouse' = '${warehouse}',
+'use_meta_cache' = 'true'
+);"""
+
+sql """switch internal"""
+sql """drop database if exists ${dbName}"""
+sql """create database if not exists ${dbName}"""
+sql """use ${dbName}"""
+
+sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable1}"""
+sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable2}"""
+sql """create database if not exists ${catalog_name}.${icebergDb}"""
+sql """
+CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} (
+  ts DATETIME,
+  value INT)
+ENGINE=iceberg
+PARTITION BY LIST (DAY(ts)) ();
+"""
+sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} 
values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 
21:02:03', 3)"""
+sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO 
ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2  PROPERTIES 
('replication_num' = '1') as SELECT * FROM 
${catalog_name}.${icebergDb}.${icebergTable1}"""
+sql """REFRESH MATERIALIZED VIEW ${mvName1} complete"""
+waitingMTMVTaskFinishedByMvName(mvName1)
+qt_test_ts_refresh1 "select * from ${mvName1} order by value"
+
+sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} 
values ('2024-10-26 21:02:03', 4)"""
+sql """REFRESH MATERIALIZED VIEW ${mvName1} complete"""
+waitingMTMVTaskFinishedByMvName(mvName1)
+qt_test_ts_refresh2 """select * from ${mvName1} order by value"""
+
+sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} 
values ('2024-10-26 01:22:03', 5), ('2024-10-27 01:12:03', 6);"""
+sql """REFRESH MATERIALIZED VIEW ${mvName1} 
partitions(p_2024102600_2024102700);"""
+waitingMTMVTaskFinishedByMvName(mvName1)
+qt_test_ts_refresh3 """select * from ${mvName1} order by value"""
+
+sql """REFRESH MATERIALIZED VIEW ${mvName1} complete"""

Review Comment:
   It is necessary to test the specified auto refresh, otherwise snapshot 
information will not be calculated.
   



##
regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy:
##
@@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", 
"p0,external,iceberg,external_docker,external_docker_
 sql """drop materialized view if exists ${mvName};"""
 sql """ drop catalog if exists ${catalog_name} """
 }
+
+// Test partition refresh.
+// Use hms catalog to avoid rest catalog fail to write caused by sqlite 
database file locked.
+if (enabled != null && enabled.equalsIgnoreCase("true")) {
+String hivePrefix = "hive2";
+String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+String hdfs_port = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
+String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+String warehouse = "${default_fs}/warehouse"
+
+String catalog_name = "iceberg_mtmv_catalog_hms";
+String mvName1 = "test_iceberg_mtmv_ts"
+String mvName2 = "test_iceberg_mtmv_d"
+String dbName = "regression_test_mtmv_partition_p0"
+String icebergDb = "iceberg_mtmv_partition"
+  

Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-11-28 Thread via GitHub


morrySnow commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1862349495


##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
 }
 }
 
+/ begin code for unit test **/

Review Comment:
   u could use annotation visableForTesting



##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
 }
 }
 
+/ begin code for unit test **/
+public void setTable(Table table) {
+this.table = table;
+}
+
+public List getSchema() {
+Schema schema = table.schema();
+List columns = schema.columns();
+List tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+for (Types.NestedField field : columns) {
+tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+IcebergUtils.icebergTypeToDorisType(field.type()), true, 
null, true, field.doc(), true,
+schema.caseInsensitiveFindField(field.name()).fieldId()));
+}
+return tmpSchema;
+}
+/ end code for unit test **/
+
 @Override
 public Optional initSchema() {
-return Optional.of(new 
SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
+table = IcebergUtils.getIcebergTable(catalog, dbName, name);
+List schema = IcebergUtils.getSchema(catalog, dbName, name);
+// List schema = getSchema();
+Snapshot snapshot = table.currentSnapshot();
+if (snapshot == null) {
+LOG.info("Table {} is empty", name);

Review Comment:
   debug log is enough?



##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
 }
 }
 
+/ begin code for unit test **/
+public void setTable(Table table) {
+this.table = table;
+}
+
+public List getSchema() {
+Schema schema = table.schema();
+List columns = schema.columns();
+List tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+for (Types.NestedField field : columns) {
+tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+IcebergUtils.icebergTypeToDorisType(field.type()), true, 
null, true, field.doc(), true,
+schema.caseInsensitiveFindField(field.name()).fieldId()));
+}
+return tmpSchema;
+}
+/ end code for unit test **/
+
 @Override
 public Optional initSchema() {
-return Optional.of(new 
SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
+table = IcebergUtils.getIcebergTable(catalog, dbName, name);
+List schema = IcebergUtils.getSchema(catalog, dbName, name);
+// List schema = getSchema();
+Snapshot snapshot = table.currentSnapshot();
+if (snapshot == null) {
+LOG.info("Table {} is empty", name);
+return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, 
null));
+}
+long snapshotId = snapshot.snapshotId();
+PartitionSpec spec = table.spec();
+List partitionColumns = null;
+IcebergPartitionInfo partitionInfo = null;
+if (isSupportedPartitionTable()) {
+partitionColumns = Lists.newArrayList();
+PartitionField field = spec.fields().get(0);

Review Comment:
   add comment to notice the reason of `get(0)`



##
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
 }
 }
 
+/ begin code for unit test **/
+public void setTable(Table table) {
+this.table = table;
+}
+
+public List getSchema() {
+Schema schema = table.schema();
+List columns = schema.columns();
+List tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+for (Types.NestedField field : columns) {
+tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+IcebergUtils.icebergTypeToDorisType(field.type()), true, 
null, true, field.doc(), true,
+schema.caseInsensitiveFindField(field.name()).fieldId()));
+}
+return tmpSchema;
+}
+/ end code for unit test **/
+
 @Override
 public Optional initSchema() {
-return Optional.of(new 
SchemaCacheValue(I

Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-11-28 Thread via GitHub


Jibing-Li commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2505625085

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org



Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]

2024-11-27 Thread via GitHub


doris-robot commented on PR #44726:
URL: https://github.com/apache/doris/pull/44726#issuecomment-2505394341

   
   Thank you for your contribution to Apache Doris.
   Don't know what should be done next? See [How to process your 
PR](https://cwiki.apache.org/confluence/display/DORIS/How+to+process+your+PR).
   
   Please clearly describe your PR:
   1. What problem was fixed (it's best to include specific error reporting 
information). How it was fixed.
   2. Which behaviors were modified. What was the previous behavior, what is it 
now, why was it modified, and what possible impacts might there be.
   3. What features were added. Why was this function added?
   4. Which code was refactored and why was this part of the code refactored?
   5. Which functions were optimized and what is the difference before and 
after the optimization?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org