Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
LadyForest commented on PR #24179: URL: https://github.com/apache/flink/pull/24179#issuecomment-1916128105 > Hi, @LadyForest. After rebasing onto master, the test for GroupAggregateRestoreTest#AGG_WITH_STATE_TTL_HINT failed due to an unknown reason. It succeeded after the savepoint metadata was regenerated. Considering the feature freeze on January 30th, I will continue to monitor CI for any unstable tests after merging this feature and will promptly address any issues during the bugfix period. After debugging, I found that the `_metadata` has serialized the last accessed timestamp for keyed state when state retention is enabled. So I guess this test would fail in 4 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
LadyForest merged PR #24179: URL: https://github.com/apache/flink/pull/24179 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on PR #24179: URL: https://github.com/apache/flink/pull/24179#issuecomment-1914824589 Hi, @LadyForest. After rebasing onto master, the test for GroupAggregateRestoreTest#AGG_WITH_STATE_TTL_HINT failed due to an unknown reason. It succeeded after the savepoint metadata was regenerated. Considering the feature freeze on January 30th, I will continue to monitor CI for any unstable tests after merging this feature and will promptly address any issues during the bugfix period. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1467286142 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala: ## @@ -149,7 +149,16 @@ class RelTreeWriterImpl( if (stateTtlHints.nonEmpty) { printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(stateTtlHints))) } - +case agg: Aggregate => + val aggHints = FlinkHints.getAllStateTtlHints(agg.getHints) + if (aggHints.nonEmpty) { +printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(aggHints))) + } +case agg: StreamPhysicalGroupAggregateBase => + val aggHints = FlinkHints.getAllStateTtlHints(agg.hints) + if (aggHints.nonEmpty) { +printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(aggHints))) + } Review Comment: Good idea! I'll update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
LadyForest commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1467207831 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala: ## @@ -149,7 +149,16 @@ class RelTreeWriterImpl( if (stateTtlHints.nonEmpty) { printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(stateTtlHints))) } - +case agg: Aggregate => + val aggHints = FlinkHints.getAllStateTtlHints(agg.getHints) + if (aggHints.nonEmpty) { +printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(aggHints))) + } +case agg: StreamPhysicalGroupAggregateBase => + val aggHints = FlinkHints.getAllStateTtlHints(agg.hints) + if (aggHints.nonEmpty) { +printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(aggHints))) + } Review Comment: Nit: what about ```scala case _: Aggregate | _: StreamPhysicalGroupAggregateBase => val aggHints = rel match { case aggregate: Aggregate => aggregate.getHints case _ => rel.asInstanceOf[StreamPhysicalGroupAggregateBase].hints } if (aggHints.nonEmpty) { printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(aggHints))) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1466284522 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java: ## @@ -83,21 +85,50 @@ final List resolve(List roots) { } @Override -protected RelNode visitBiRel(BiRel biRel) { -Optional leftName = extractAliasOrTableName(biRel.getLeft()); -Optional rightName = extractAliasOrTableName(biRel.getRight()); - -Set existentKVHints = new HashSet<>(); - -List oldHints = ((Hintable) biRel).getHints(); +protected RelNode doVisit(RelNode node) { +List oldHints = ((Hintable) node).getHints(); List oldQueryHints = FlinkHints.getAllQueryHints(oldHints); // has no hints, return directly. if (oldQueryHints.isEmpty()) { -return super.visitChildren(biRel); +return super.visitChildren(node); } -List newHints = new ArrayList<>(); +final List newHints; +if (node instanceof BiRel) { +BiRel biRel = (BiRel) node; +Optional leftName = extractAliasOrTableName(biRel.getLeft()); +Optional rightName = extractAliasOrTableName(biRel.getRight()); +newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); Review Comment: > Agree with you. Since the improvement work for the input type Optional is not a part of this pr, I have created a new jira for it. https://issues.apache.org/jira/browse/FLINK-34235 Assigned to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
LadyForest commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1467200085 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java: ## @@ -83,21 +85,50 @@ final List resolve(List roots) { } @Override -protected RelNode visitBiRel(BiRel biRel) { -Optional leftName = extractAliasOrTableName(biRel.getLeft()); -Optional rightName = extractAliasOrTableName(biRel.getRight()); - -Set existentKVHints = new HashSet<>(); - -List oldHints = ((Hintable) biRel).getHints(); +protected RelNode doVisit(RelNode node) { +List oldHints = ((Hintable) node).getHints(); List oldQueryHints = FlinkHints.getAllQueryHints(oldHints); // has no hints, return directly. if (oldQueryHints.isEmpty()) { -return super.visitChildren(biRel); +return super.visitChildren(node); } -List newHints = new ArrayList<>(); +final List newHints; +if (node instanceof BiRel) { +BiRel biRel = (BiRel) node; +Optional leftName = extractAliasOrTableName(biRel.getLeft()); +Optional rightName = extractAliasOrTableName(biRel.getRight()); +newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); Review Comment: > https://issues.apache.org/jira/browse/FLINK-34235 Assigned to you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1466362809 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java: ## @@ -83,21 +85,50 @@ final List resolve(List roots) { } @Override -protected RelNode visitBiRel(BiRel biRel) { -Optional leftName = extractAliasOrTableName(biRel.getLeft()); -Optional rightName = extractAliasOrTableName(biRel.getRight()); - -Set existentKVHints = new HashSet<>(); - -List oldHints = ((Hintable) biRel).getHints(); +protected RelNode doVisit(RelNode node) { +List oldHints = ((Hintable) node).getHints(); List oldQueryHints = FlinkHints.getAllQueryHints(oldHints); // has no hints, return directly. if (oldQueryHints.isEmpty()) { -return super.visitChildren(biRel); +return super.visitChildren(node); } -List newHints = new ArrayList<>(); +final List newHints; +if (node instanceof BiRel) { +BiRel biRel = (BiRel) node; +Optional leftName = extractAliasOrTableName(biRel.getLeft()); +Optional rightName = extractAliasOrTableName(biRel.getRight()); +newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); +} else if (node instanceof SingleRel) { +SingleRel singleRel = (SingleRel) node; +Optional tableName = extractAliasOrTableName(singleRel.getInput()); Review Comment: Emm it seems not, I will modify it separately in this separate jira. WDYT? https://issues.apache.org/jira/browse/FLINK-34235 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1466324477 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala: ## @@ -123,6 +130,7 @@ class StreamPhysicalGlobalGroupAggregate( generateUpdateBefore, needRetraction, indexOfCountStar.map(Integer.valueOf).orNull, + StateTtlHint.getStateTtlFromHintOnSingleRel(hints), Review Comment: I think the printing of state hint should be placed in RelTreeWriterImpl and continue to be managed uniformly by withQueryHint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1466314159 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java: ## @@ -87,4 +91,28 @@ public static Map getStateTtlFromHint(List hints) { return stateTtlFromHint; } + +/** + * Get the state ttl from hints on the {@link org.apache.calcite.rel.SingleRel} such as + * Aggregate. + * + * @return the state ttl in milliseconds. If no state ttl hints set from hint, return "null". + */ +@Nullable +public static Long getStateTtlFromHintOnSingleRel(List hints) { +AtomicReference stateTtlFromHint = new AtomicReference<>(null); +hints.stream() +.filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName)) +.forEach( +hint -> +hint.kvOptions.forEach( +(input, ttl) -> { +if (FlinkHints.INPUT.equals(input)) { +stateTtlFromHint.set( + TimeUtils.parseDuration(ttl).toMillis()); +} +})); + +return stateTtlFromHint.get(); Review Comment: I'll modify the code to return null if the list is empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1466284522 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java: ## @@ -83,21 +85,50 @@ final List resolve(List roots) { } @Override -protected RelNode visitBiRel(BiRel biRel) { -Optional leftName = extractAliasOrTableName(biRel.getLeft()); -Optional rightName = extractAliasOrTableName(biRel.getRight()); - -Set existentKVHints = new HashSet<>(); - -List oldHints = ((Hintable) biRel).getHints(); +protected RelNode doVisit(RelNode node) { +List oldHints = ((Hintable) node).getHints(); List oldQueryHints = FlinkHints.getAllQueryHints(oldHints); // has no hints, return directly. if (oldQueryHints.isEmpty()) { -return super.visitChildren(biRel); +return super.visitChildren(node); } -List newHints = new ArrayList<>(); +final List newHints; +if (node instanceof BiRel) { +BiRel biRel = (BiRel) node; +Optional leftName = extractAliasOrTableName(biRel.getLeft()); +Optional rightName = extractAliasOrTableName(biRel.getRight()); +newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); Review Comment: Agree with you. Since the improvement work for the input type Optional is not a part of this pr, I have created a new jira for it. https://issues.apache.org/jira/browse/FLINK-34235 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1466282357 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java: ## @@ -52,10 +52,13 @@ public abstract class FlinkHints { // ~ Internal alias tag hint public static final String HINT_ALIAS = "ALIAS"; -// ~ Option name for hints on join or correlate +// ~ Option name for hints on BiRel like join or correlate public static final String LEFT_INPUT = "LEFT"; public static final String RIGHT_INPUT = "RIGHT"; +// ~ Option name for hints on SingleRel like aggregate +public static final String INPUT = "INPUT"; Review Comment: Agree! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
LadyForest commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1465969973 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java: ## @@ -83,21 +85,50 @@ final List resolve(List roots) { } @Override -protected RelNode visitBiRel(BiRel biRel) { -Optional leftName = extractAliasOrTableName(biRel.getLeft()); -Optional rightName = extractAliasOrTableName(biRel.getRight()); - -Set existentKVHints = new HashSet<>(); - -List oldHints = ((Hintable) biRel).getHints(); +protected RelNode doVisit(RelNode node) { +List oldHints = ((Hintable) node).getHints(); List oldQueryHints = FlinkHints.getAllQueryHints(oldHints); // has no hints, return directly. if (oldQueryHints.isEmpty()) { -return super.visitChildren(biRel); +return super.visitChildren(node); } -List newHints = new ArrayList<>(); +final List newHints; +if (node instanceof BiRel) { +BiRel biRel = (BiRel) node; +Optional leftName = extractAliasOrTableName(biRel.getLeft()); +Optional rightName = extractAliasOrTableName(biRel.getRight()); +newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); +} else if (node instanceof SingleRel) { +SingleRel singleRel = (SingleRel) node; +Optional tableName = extractAliasOrTableName(singleRel.getInput()); Review Comment: Can we get an empty table/alias name here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
LadyForest commented on code in PR #24179: URL: https://github.com/apache/flink/pull/24179#discussion_r1465877429 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java: ## @@ -83,21 +85,50 @@ final List resolve(List roots) { } @Override -protected RelNode visitBiRel(BiRel biRel) { -Optional leftName = extractAliasOrTableName(biRel.getLeft()); -Optional rightName = extractAliasOrTableName(biRel.getRight()); - -Set existentKVHints = new HashSet<>(); - -List oldHints = ((Hintable) biRel).getHints(); +protected RelNode doVisit(RelNode node) { +List oldHints = ((Hintable) node).getHints(); List oldQueryHints = FlinkHints.getAllQueryHints(oldHints); // has no hints, return directly. if (oldQueryHints.isEmpty()) { -return super.visitChildren(biRel); +return super.visitChildren(node); } -List newHints = new ArrayList<>(); +final List newHints; +if (node instanceof BiRel) { +BiRel biRel = (BiRel) node; +Optional leftName = extractAliasOrTableName(biRel.getLeft()); +Optional rightName = extractAliasOrTableName(biRel.getRight()); +newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); Review Comment: Nit: I think `validateAndGetNewHints` is good enough, and it's ok to have an overloaded method here. Another point is I don't think passing `Optional` as arguments is a good practice. ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java: ## @@ -52,10 +52,13 @@ public abstract class FlinkHints { // ~ Internal alias tag hint public static final String HINT_ALIAS = "ALIAS"; -// ~ Option name for hints on join or correlate +// ~ Option name for hints on BiRel like join or correlate public static final String LEFT_INPUT = "LEFT"; public static final String RIGHT_INPUT = "RIGHT"; +// ~ Option name for hints on SingleRel like aggregate +public static final String INPUT = "INPUT"; Review Comment: Is it more natural to convert the KV options to list options for agg? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java: ## @@ -122,8 +122,9 @@ public static HintStrategyTable createHintStrategyTable() { .build()) .hintStrategy( StateTtlHint.STATE_TTL.getHintName(), -// TODO support agg state ttl hint -HintStrategy.builder(HintPredicates.JOIN) +HintStrategy.builder( Review Comment: Nit: this is a comment on `STATE_TTL_NON_EMPTY_KV_OPTION_CHECKER`. We'd better keep a unified error message template with other hints. Suggested changes ```java litmus.check( ttlHint.kvOptions.size() > 0, "Invalid STATE_TTL hint, expecting at least one key-value options specified."); ``` ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java: ## @@ -87,4 +91,28 @@ public static Map getStateTtlFromHint(List hints) { return stateTtlFromHint; } + +/** + * Get the state ttl from hints on the {@link org.apache.calcite.rel.SingleRel} such as + * Aggregate. + * + * @return the state ttl in milliseconds. If no state ttl hints set from hint, return "null". + */ +@Nullable +public static Long getStateTtlFromHintOnSingleRel(List hints) { +AtomicReference stateTtlFromHint = new AtomicReference<>(null); +hints.stream() +.filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName)) +.forEach( +hint -> +hint.kvOptions.forEach( +(input, ttl) -> { +if (FlinkHints.INPUT.equals(input)) { +stateTtlFromHint.set( + TimeUtils.parseDuration(ttl).toMillis()); +} +})); + +return stateTtlFromHint.get(); Review Comment: ```suggestion return hints.stream() .filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName)) .flatMap(hint -> hint.listOptions.stream()) .map(ttl -> TimeUtils.parseDuration(ttl).toMillis()) .collect(Collectors.toList()) .get(0); ``` ##
Re: [PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
flinkbot commented on PR #24179: URL: https://github.com/apache/flink/pull/24179#issuecomment-1906059763 ## CI report: * e73ed979ea6aa69d8bc823f2cb6afe5d0a86d041 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34053][table-planner] Support state ttl hint for group aggregate [flink]
xuyangzhong opened a new pull request, #24179: URL: https://github.com/apache/flink/pull/24179 ## What is the purpose of the change *Currently, state ttl hint can work only on join node. This pr aims to support state ttl hint on group agg node.* ## Brief change log - *support state ttl hint on group agg node* - *add IT cases and plan cases for this pr* ## Verifying this change Some tests are added for it. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? Another pr will be introduced for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org