Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li merged PR #44726: URL: https://github.com/apache/doris/pull/44726 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
github-actions[bot] commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2544520651 PR approved by at least one committer and no changes requested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2544500704 run buildall -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1886117447 ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -51,9 +108,63 @@ protected synchronized void makeSureInitialized() { } } +@VisibleForTesting +public void setTable(Table table) { +this.table = table; +} + +@VisibleForTesting +public void setPartitionColumns(List partitionColumns) { +this.partitionColumns = partitionColumns; +} + +@VisibleForTesting +public List getSchema() { +Schema schema = table.schema(); +List columns = schema.columns(); +List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); +for (Types.NestedField field : columns) { +tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), Review Comment: The code was used for unit test, it is useless now. I removed the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1886117113 ## fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java: ## @@ -176,6 +177,11 @@ public void run() throws JobException { this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); beforeMTMVRefresh(); if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { +MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); +if (!relatedTable.isValidRelatedTable()) { +throw new JobException("MTMV " + mtmv.getName() + "'s related table " + relatedTable.getName() ++ " is not a valid related table anymore, stop refreshing."); Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
morrySnow commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1879883501 ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -51,9 +108,63 @@ protected synchronized void makeSureInitialized() { } } +@VisibleForTesting +public void setTable(Table table) { +this.table = table; +} + +@VisibleForTesting +public void setPartitionColumns(List partitionColumns) { +this.partitionColumns = partitionColumns; +} + +@VisibleForTesting +public List getSchema() { +Schema schema = table.schema(); +List columns = schema.columns(); +List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); +for (Types.NestedField field : columns) { +tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), Review Comment: why to lowercase? ## fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java: ## @@ -176,6 +177,11 @@ public void run() throws JobException { this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); beforeMTMVRefresh(); if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { +MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); +if (!relatedTable.isValidRelatedTable()) { +throw new JobException("MTMV " + mtmv.getName() + "'s related table " + relatedTable.getName() ++ " is not a valid related table anymore, stop refreshing."); Review Comment: could we also print invalid reason here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2531684212 run p0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2530794503 run buildall -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
github-actions[bot] commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2530806461 PR approved by anyone and no changes requested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1877456441 ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -90,4 +199,318 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + +@Override +public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { +Env.getCurrentEnv().getRefreshManager() +.refreshTable(getCatalog().getName(), getDbName(), getName(), true); +} + +@Override +public Map getAndCopyPartitionItems(Optional snapshot) { +return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); +} + +private IcebergPartitionInfo getPartitionInfoFromCache() { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +if (!schemaCacheValue.isPresent()) { +return new IcebergPartitionInfo(); +} +return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); +} + +@Override +public PartitionType getPartitionType(Optional snapshot) { +makeSureInitialized(); +return isSupportedRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; +} + +@Override +public Set getPartitionColumnNames(Optional snapshot) throws DdlException { +return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); +} + +@Override +public List getPartitionColumns(Optional snapshot) { +return getPartitionColumnsFromCache(); +} + +private List getPartitionColumnsFromCache() { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +return schemaCacheValue +.map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) +.orElseGet(Lists::newArrayList); +} + +@Override +public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { +long latestSnapshotId = getPartitionInfoFromCache().getLatestSnapshotId(partitionName); +if (latestSnapshotId <= 0) { +throw new AnalysisException("can not find partition: " + partitionName); +} +return new MTMVVersionSnapshot(latestSnapshotId); +} + +@Override +public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) +throws AnalysisException { +return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); +} + +public long getLatestSnapshotIdFromCache() throws AnalysisException { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +if (!schemaCacheValue.isPresent()) { +throw new AnalysisException("Can't find schema cache of table " + name); +} +return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); +} + +@Override +public boolean isPartitionColumnAllowNull() { +if (partitionColumns == null || partitionColumns.size() != 1) { +return false; +} +return partitionColumns.get(0).isAllowNull(); +} + +@Override +public boolean isSupportedRelatedTable() { +Set allFields = Sets.newHashSet(); Review Comment: Will add cache for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1877446230 ## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java: ## @@ -115,4 +115,11 @@ default boolean needAutoRefresh() { * @return */ boolean isPartitionColumnAllowNull(); + +/** + * If the table is supported as related table. + * For example, an Iceberg table may become unsupported after partition revolution. + * @return + */ +boolean isSupportedRelatedTable(); Review Comment: Renamed to isValidRelatedTable. This function is used to check a table's partition is still supported. For example, if the partition column changed from col1 to col2, this function will return false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1877429504 ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java: ## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.Set; + +public class IcebergPartitionInfo { +private final Map nameToPartitionItem; +private final Map nameToIcebergPartition; +private final Map> nameToIcebergPartitionNames; + +public IcebergPartitionInfo() { +this.nameToPartitionItem = Maps.newHashMap(); +this.nameToIcebergPartition = Maps.newHashMap(); +this.nameToIcebergPartitionNames = Maps.newHashMap(); +} + +public IcebergPartitionInfo(Map nameToPartitionItem, + Map nameToIcebergPartition, +Map> nameToIcebergPartitionNames) { +this.nameToPartitionItem = nameToPartitionItem; +this.nameToIcebergPartition = nameToIcebergPartition; +this.nameToIcebergPartitionNames = nameToIcebergPartitionNames; +} + +public Map getNameToPartitionItem() { +return nameToPartitionItem; +} + +public Map getNameToIcebergPartition() { +return nameToIcebergPartition; +} + +public long getLatestSnapshotId(String partitionName) { +Set icebergPartitionNames = nameToIcebergPartitionNames.get(partitionName); +if (icebergPartitionNames == null) { Review Comment: Here is an example: Iceberg table has 3 partitions, their name to ranges are like: p1 -> [0, 10) p2 -> [10, 30) p3 -> [20, 30) In Doris, p2 and p3 should be merged to one partition p2 -> [10, 30)。So doris has the 2 partitions: p1 -> [0, 10) p2 -> [10, 30) In this case, we need a map nameToIcebergPartitionNames to record that p2 is actually mapped to p2 and p3 in Iceberg: nameToIcebergPartitionNames : p2 -> {p2, p3} The partition p1 in doris is mapped to p1 in Iceberg (1 to 1 map), so we don't keep any mapping in nameToIcebergPartitionNames for p1. So when we try to get snapshot id for p1, nameToIcebergPartitionNames.get("p1") will return null. It means p1 in doris is mapped only to p1 in Iceberg -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1877429504 ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java: ## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.Set; + +public class IcebergPartitionInfo { +private final Map nameToPartitionItem; +private final Map nameToIcebergPartition; +private final Map> nameToIcebergPartitionNames; + +public IcebergPartitionInfo() { +this.nameToPartitionItem = Maps.newHashMap(); +this.nameToIcebergPartition = Maps.newHashMap(); +this.nameToIcebergPartitionNames = Maps.newHashMap(); +} + +public IcebergPartitionInfo(Map nameToPartitionItem, + Map nameToIcebergPartition, +Map> nameToIcebergPartitionNames) { +this.nameToPartitionItem = nameToPartitionItem; +this.nameToIcebergPartition = nameToIcebergPartition; +this.nameToIcebergPartitionNames = nameToIcebergPartitionNames; +} + +public Map getNameToPartitionItem() { +return nameToPartitionItem; +} + +public Map getNameToIcebergPartition() { +return nameToIcebergPartition; +} + +public long getLatestSnapshotId(String partitionName) { +Set icebergPartitionNames = nameToIcebergPartitionNames.get(partitionName); +if (icebergPartitionNames == null) { Review Comment: Here is an example: Iceberg table has 3 partitions, their name to ranges are like: p1 -> [0, 10) p2 -> [10, 30) p3 -> [20, 30) In Doris, p2 and p3 should be merged to one partition p2 -> [10, 30)。So doris has the 2 partitions: p1 -> [0, 10) p2 -> [10, 30) In this case, we need a map nameToIcebergPartitionNames to record that p2 is actually mapped to p2 and p3 in Iceberg: nameToIcebergPartitionNames : p2 -> {p2, p3} The partition p1 in doris is mapped to p1 in Iceberg (1 to 1 map), so we don't keep any mapping in nameToIcebergPartitionNames for p1. So when we try to get snapshot id for p1, nameToIcebergPartitionNames.get("p1") will return null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
zddr commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1875797728 ## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java: ## @@ -115,4 +115,11 @@ default boolean needAutoRefresh() { * @return */ boolean isPartitionColumnAllowNull(); + +/** + * If the table is supported as related table. + * For example, an Iceberg table may become unsupported after partition revolution. + * @return + */ +boolean isSupportedRelatedTable(); Review Comment: not need this method?If some scenarios are not supported, simply return Unpartitioned in the getPartitionType method ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java: ## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.Set; + +public class IcebergPartitionInfo { +private final Map nameToPartitionItem; +private final Map nameToIcebergPartition; +private final Map> nameToIcebergPartitionNames; + +public IcebergPartitionInfo() { +this.nameToPartitionItem = Maps.newHashMap(); +this.nameToIcebergPartition = Maps.newHashMap(); +this.nameToIcebergPartitionNames = Maps.newHashMap(); +} + +public IcebergPartitionInfo(Map nameToPartitionItem, + Map nameToIcebergPartition, +Map> nameToIcebergPartitionNames) { +this.nameToPartitionItem = nameToPartitionItem; +this.nameToIcebergPartition = nameToIcebergPartition; +this.nameToIcebergPartitionNames = nameToIcebergPartitionNames; +} + +public Map getNameToPartitionItem() { +return nameToPartitionItem; +} + +public Map getNameToIcebergPartition() { +return nameToIcebergPartition; +} + +public long getLatestSnapshotId(String partitionName) { +Set icebergPartitionNames = nameToIcebergPartitionNames.get(partitionName); +if (icebergPartitionNames == null) { Review Comment: What situation would be null ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -90,4 +199,318 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + +@Override +public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { +Env.getCurrentEnv().getRefreshManager() +.refreshTable(getCatalog().getName(), getDbName(), getName(), true); +} + +@Override +public Map getAndCopyPartitionItems(Optional snapshot) { +return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); +} + +private IcebergPartitionInfo getPartitionInfoFromCache() { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +if (!schemaCacheValue.isPresent()) { +return new IcebergPartitionInfo(); +} +return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); +} + +@Override +public PartitionType getPartitionType(Optional snapshot) { +makeSureInitialized(); +return isSupportedRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; +} + +@Override +public Set getPartitionColumnNames(Optional snapshot) throws DdlException { +return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); +} + +@Override +public List getPartitionColumns(Optional snapshot) { +return getPartitionColumnsFromCache(); +} + +private List getPartitionColumnsFromCache() { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +return schemaCacheValue +.map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) +.orElseGet(Lists::newArrayList); +} + +@O
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2522236034 run buildall -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1870952167 ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -90,4 +188,200 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + +@Override +public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { +Env.getCurrentEnv().getRefreshManager() +.refreshTable(getCatalog().getName(), getDbName(), getName(), true); +} + +@Override +public Map getAndCopyPartitionItems(Optional snapshot) { +return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); +} + +private IcebergPartitionInfo getPartitionInfoFromCache() { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +if (!schemaCacheValue.isPresent()) { +return new IcebergPartitionInfo(); +} +return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); +} + +@Override +public PartitionType getPartitionType(Optional snapshot) { +return isSupportedPartitionTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; +} + +@Override +public Set getPartitionColumnNames(Optional snapshot) throws DdlException { +return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); +} + +@Override +public List getPartitionColumns(Optional snapshot) { +return getPartitionColumnsFromCache(); +} + +private List getPartitionColumnsFromCache() { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +return schemaCacheValue +.map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) +.orElseGet(Lists::newArrayList); +} + +@Override +public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { +IcebergPartition icebergPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); +if (icebergPartition == null) { +throw new AnalysisException("can not find partition: " + partitionName); +} +return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId()); +} + +@Override +public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) +throws AnalysisException { +return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); +} + +public long getLatestSnapshotIdFromCache() throws AnalysisException { +makeSureInitialized(); +Optional schemaCacheValue = getSchemaCacheValue(); +if (!schemaCacheValue.isPresent()) { +throw new AnalysisException("not present"); +} +return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); +} + +@Override +public boolean isPartitionColumnAllowNull() { +return true; +} + +public boolean isSupportedPartitionTable() { +// TODO: Support IDENTITY transform. +PartitionSpec spec = table.spec(); +if (spec == null) { +return false; +} +if (spec.fields().size() != 1) { +return false; +} +String transformName = spec.fields().get(0).transform().toString(); Review Comment: The class Year is protected, so we can't call instanceof Year in our code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
zddr commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1862888274 ## regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy: ## @@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName};""" sql """ drop catalog if exists ${catalog_name} """ } + +// Test partition refresh. +// Use hms catalog to avoid rest catalog fail to write caused by sqlite database file locked. +if (enabled != null && enabled.equalsIgnoreCase("true")) { +String hivePrefix = "hive2"; +String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") +String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") +String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" +String warehouse = "${default_fs}/warehouse" + +String catalog_name = "iceberg_mtmv_catalog_hms"; +String mvName1 = "test_iceberg_mtmv_ts" +String mvName2 = "test_iceberg_mtmv_d" +String dbName = "regression_test_mtmv_partition_p0" +String icebergDb = "iceberg_mtmv_partition" +String icebergTable1 = "tstable" +String icebergTable2 = "dtable" +sql """drop catalog if exists ${catalog_name} """ +sql """create catalog if not exists ${catalog_name} properties ( +'type'='iceberg', +'iceberg.catalog.type'='hms', +'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', +'fs.defaultFS' = '${default_fs}', +'warehouse' = '${warehouse}', +'use_meta_cache' = 'true' +);""" + +sql """switch internal""" +sql """drop database if exists ${dbName}""" +sql """create database if not exists ${dbName}""" +sql """use ${dbName}""" + +sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable1}""" +sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" +sql """create database if not exists ${catalog_name}.${icebergDb}""" +sql """ +CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} ( + ts DATETIME, + value INT) +ENGINE=iceberg +PARTITION BY LIST (DAY(ts)) (); +""" +sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 21:02:03', 3)""" +sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}""" +sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" +waitingMTMVTaskFinishedByMvName(mvName1) +qt_test_ts_refresh1 "select * from ${mvName1} order by value" + +sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 21:02:03', 4)""" +sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" +waitingMTMVTaskFinishedByMvName(mvName1) +qt_test_ts_refresh2 """select * from ${mvName1} order by value""" + +sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:22:03', 5), ('2024-10-27 01:12:03', 6);""" +sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_2024102600_2024102700);""" +waitingMTMVTaskFinishedByMvName(mvName1) +qt_test_ts_refresh3 """select * from ${mvName1} order by value""" + +sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" Review Comment: It is necessary to test the specified auto refresh, otherwise snapshot information will not be calculated. ## regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy: ## @@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName};""" sql """ drop catalog if exists ${catalog_name} """ } + +// Test partition refresh. +// Use hms catalog to avoid rest catalog fail to write caused by sqlite database file locked. +if (enabled != null && enabled.equalsIgnoreCase("true")) { +String hivePrefix = "hive2"; +String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") +String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") +String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" +String warehouse = "${default_fs}/warehouse" + +String catalog_name = "iceberg_mtmv_catalog_hms"; +String mvName1 = "test_iceberg_mtmv_ts" +String mvName2 = "test_iceberg_mtmv_d" +String dbName = "regression_test_mtmv_partition_p0" +String icebergDb = "iceberg_mtmv_partition" +
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
morrySnow commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1862349495 ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } +/ begin code for unit test **/ Review Comment: u could use annotation visableForTesting ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } +/ begin code for unit test **/ +public void setTable(Table table) { +this.table = table; +} + +public List getSchema() { +Schema schema = table.schema(); +List columns = schema.columns(); +List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); +for (Types.NestedField field : columns) { +tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), +IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, +schema.caseInsensitiveFindField(field.name()).fieldId())); +} +return tmpSchema; +} +/ end code for unit test **/ + @Override public Optional initSchema() { -return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); +table = IcebergUtils.getIcebergTable(catalog, dbName, name); +List schema = IcebergUtils.getSchema(catalog, dbName, name); +// List schema = getSchema(); +Snapshot snapshot = table.currentSnapshot(); +if (snapshot == null) { +LOG.info("Table {} is empty", name); Review Comment: debug log is enough? ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } +/ begin code for unit test **/ +public void setTable(Table table) { +this.table = table; +} + +public List getSchema() { +Schema schema = table.schema(); +List columns = schema.columns(); +List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); +for (Types.NestedField field : columns) { +tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), +IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, +schema.caseInsensitiveFindField(field.name()).fieldId())); +} +return tmpSchema; +} +/ end code for unit test **/ + @Override public Optional initSchema() { -return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); +table = IcebergUtils.getIcebergTable(catalog, dbName, name); +List schema = IcebergUtils.getSchema(catalog, dbName, name); +// List schema = getSchema(); +Snapshot snapshot = table.currentSnapshot(); +if (snapshot == null) { +LOG.info("Table {} is empty", name); +return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, null)); +} +long snapshotId = snapshot.snapshotId(); +PartitionSpec spec = table.spec(); +List partitionColumns = null; +IcebergPartitionInfo partitionInfo = null; +if (isSupportedPartitionTable()) { +partitionColumns = Lists.newArrayList(); +PartitionField field = spec.fields().get(0); Review Comment: add comment to notice the reason of `get(0)` ## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } +/ begin code for unit test **/ +public void setTable(Table table) { +this.table = table; +} + +public List getSchema() { +Schema schema = table.schema(); +List columns = schema.columns(); +List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); +for (Types.NestedField field : columns) { +tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), +IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, +schema.caseInsensitiveFindField(field.name()).fieldId())); +} +return tmpSchema; +} +/ end code for unit test **/ + @Override public Optional initSchema() { -return Optional.of(new SchemaCacheValue(I
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
Jibing-Li commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2505625085 run buildall -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
Re: [PR] [feature](mtmv)Support iceberg partition refresh. [doris]
doris-robot commented on PR #44726: URL: https://github.com/apache/doris/pull/44726#issuecomment-2505394341 Thank you for your contribution to Apache Doris. Don't know what should be done next? See [How to process your PR](https://cwiki.apache.org/confluence/display/DORIS/How+to+process+your+PR). Please clearly describe your PR: 1. What problem was fixed (it's best to include specific error reporting information). How it was fixed. 2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be. 3. What features were added. Why was this function added? 4. Which code was refactored and why was this part of the code refactored? 5. Which functions were optimized and what is the difference before and after the optimization? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org