[GitHub] [flink] beyond1920 commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
beyond1920 commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058118429 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression tryCastOperand = tryCast(operand(0), typeLiteral(getResultType())); +return new Expression[] { +/* sum = */ ifThenElse( +isNull(operand(0)), +sum, +ifThenElse( +isNull(tryCastOperand), +sum, +ifThenElse( +isNull(sum), +tryCastOperand, +adjustedPlus(sum, tryCastOperand +}; +} Review Comment: Do we need cast the final result here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] beyond1920 commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
beyond1920 commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058116587 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression tryCastOperand = tryCast(operand(0), typeLiteral(getResultType())); +return new Expression[] { +/* sum = */ ifThenElse( +isNull(operand(0)), +sum, +ifThenElse( +isNull(tryCastOperand), +sum, Review Comment: Do we really need use `ifThenElse(isNull(tryCastOperand), sum,` again here since we have checked `isNull(operand(0))` before -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH
jiangxin369 commented on code in PR #191: URL: https://github.com/apache/flink-ml/pull/191#discussion_r1057228565 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/MinHashLSH.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature.lsh; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import java.io.IOException; + +/** + * An Estimator that implements the MinHash LSH algorithm, which supports LSH for Jaccard distance. + * + * The input could be dense or sparse vectors. Each input vector must hava at least one non-zero Review Comment: ```suggestion * The input could be dense or sparse vectors. Each input vector must have at least one non-zero ``` ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java: ## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature.lsh; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.DataStreamUtils; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.ArrayUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * Base class for LSH model. + * + * In addition to transforming input feature vectors to multiple hash values, it also supports + * approximate nearest neighbors search within a dataset regarding a key vector and approximate + * similarity join between two datasets. + * + * @param class type of the LSHModel implementation itself. + */ +abstract class LSHModel> implements Model, LSHModelParams { +private static final String MODEL_DATA_BC_KEY = "modelData"; + +private final Map, Object> paramMap = new HashMap<>(); + +/** Stores the corresponding model data class of T. */ +private final Class modelDataClass; + +protected Table modelDataTable; + +public LSHModel(Class modelDataClass) { +
[jira] [Assigned] (FLINK-29859) TPC-DS end-to-end test with adaptive batch scheduler failed due to oo non-empty .out files.
[ https://issues.apache.org/jira/browse/FLINK-29859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-29859: -- Assignee: Lijie Wang > TPC-DS end-to-end test with adaptive batch scheduler failed due to oo > non-empty .out files. > --- > > Key: FLINK-29859 > URL: https://issues.apache.org/jira/browse/FLINK-29859 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.16.0, 1.17.0 >Reporter: Leonard Xu >Assignee: Lijie Wang >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.17.0, 1.16.1 > > > Nov 03 02:02:12 [FAIL] 'TPC-DS end-to-end test with adaptive batch scheduler' > failed after 21 minutes and 44 seconds! Test exited with exit code 0 but the > logs contained errors, exceptions or non-empty .out files > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42766=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29859) TPC-DS end-to-end test with adaptive batch scheduler failed due to oo non-empty .out files.
[ https://issues.apache.org/jira/browse/FLINK-29859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang resolved FLINK-29859. Fix Version/s: 1.17.0 1.16.1 Resolution: Fixed > TPC-DS end-to-end test with adaptive batch scheduler failed due to oo > non-empty .out files. > --- > > Key: FLINK-29859 > URL: https://issues.apache.org/jira/browse/FLINK-29859 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.16.0, 1.17.0 >Reporter: Leonard Xu >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.17.0, 1.16.1 > > > Nov 03 02:02:12 [FAIL] 'TPC-DS end-to-end test with adaptive batch scheduler' > failed after 21 minutes and 44 seconds! Test exited with exit code 0 but the > logs contained errors, exceptions or non-empty .out files > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42766=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29859) TPC-DS end-to-end test with adaptive batch scheduler failed due to oo non-empty .out files.
[ https://issues.apache.org/jira/browse/FLINK-29859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652398#comment-17652398 ] Lijie Wang commented on FLINK-29859: Fixed via master 14a61f368332320d7e38cc93a04f95bb63c66788 release-1.16 9e51cb8c117fd9a5f887e0a0e8faee4ff11462ea > TPC-DS end-to-end test with adaptive batch scheduler failed due to oo > non-empty .out files. > --- > > Key: FLINK-29859 > URL: https://issues.apache.org/jira/browse/FLINK-29859 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.16.0, 1.17.0 >Reporter: Leonard Xu >Priority: Major > Labels: pull-request-available, test-stability > > Nov 03 02:02:12 [FAIL] 'TPC-DS end-to-end test with adaptive batch scheduler' > failed after 21 minutes and 44 seconds! Test exited with exit code 0 but the > logs contained errors, exceptions or non-empty .out files > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42766=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30516) Add file count and row count in snapshots table
Shammon created FLINK-30516: --- Summary: Add file count and row count in snapshots table Key: FLINK-30516 URL: https://issues.apache.org/jira/browse/FLINK-30516 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.3.0, table-store-0.4.0 Reporter: Shammon Add file count and row count in mytable$snapshots table -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask
xintongsong commented on code in PR #21560: URL: https://github.com/apache/flink/pull/21560#discussion_r1058108777 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java: ## @@ -122,4 +122,6 @@ List getClusterPartitionShuffleDescriptors( IntermediateDataSetID intermediateResultPartition); MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy(); + +boolean isHybridEnableConsumePartialFinishedProducer(); Review Comment: I'm not entirely sure about introducing a hybrid specific interface in execution graph. Haven't think of an alternative way though. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +/** + * {@link PartialFinishedInputConsumableDecider} is a special {@link InputConsumableDecider}. The + * input is considered to be consumable: + * + * + * for hybrid input: when partial producer partitions are finished. + * for blocking input: when all producer partitions are finished. + * + */ +public class PartialFinishedInputConsumableDecider implements InputConsumableDecider { +public static final int NUM_FINISHED_PARTITIONS_AS_CONSUMABLE = 1; + +@Override +public boolean isInputConsumable( +SchedulingExecutionVertex executionVertex, +Set verticesToDeploy, +Map consumableStatusCache) { +for (ConsumedPartitionGroup consumedPartitionGroup : +executionVertex.getConsumedPartitionGroups()) { + +if (!consumableStatusCache.computeIfAbsent( +consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) { +return false; +} Review Comment: When there're multiple hybrid partition groups, shall we require all groups to have at least one finished partition? Or one finished partition from any of the groups? ## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ## @@ -670,6 +670,24 @@ public enum SchedulerType { code(SPECULATIVE_ENABLED.key())) .build()); +@Documentation.Section({ +Documentation.Sections.EXPERT_SCHEDULING, +Documentation.Sections.ALL_JOB_MANAGER +}) +public static final ConfigOption CONSUME_PARTIAL_FINISHED_PRODUCER_ENABLED = Review Comment: The two configs (`ONLY_CONSUME_FINISHED_PARTITION` and `CONSUME_PARTIAL_FINISHED_PRODUCER_ENABLED`) are quite alike, which may confuse users. - It's hard to understand the differences between them. - There could be conflicts. E.g., allow consuming unfinished partitions, while not allow consuming partition finished producer. I wonder if we can combine them into one config, which takes a enum type of supported values. ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import
[GitHub] [flink] wanglijie95 closed pull request #21559: [FLINK-29859][e2e] Running TPC-DS with adaptive batch scheduler supports custom exceptions check
wanglijie95 closed pull request #21559: [FLINK-29859][e2e] Running TPC-DS with adaptive batch scheduler supports custom exceptions check URL: https://github.com/apache/flink/pull/21559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #489: [FLINK-30406] Detect when jobmanager never started
morhidi commented on PR #489: URL: https://github.com/apache/flink-kubernetes-operator/pull/489#issuecomment-1366390439 +1 nice job @gyfora ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on pull request #193: [FLINK-30515] Add benchmark for CountVectorizer, Imputer, RobustScale…
jiangxin369 commented on PR #193: URL: https://github.com/apache/flink-ml/pull/193#issuecomment-1366389187 @lindong28 Could you help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Mulavar commented on a diff in pull request #21545: [FLINK-30396][table]make alias hint take effect in correlate
Mulavar commented on code in PR #21545: URL: https://github.com/apache/flink/pull/21545#discussion_r1058069998 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java: ## @@ -189,18 +191,33 @@ public static RelNode capitalizeJoinHints(RelNode root) { private static class CapitalizeJoinHintShuttle extends RelShuttleImpl { +@Override +public RelNode visit(LogicalCorrelate correlate) { +return visitBiRel(correlate); +} + @Override public RelNode visit(LogicalJoin join) { -List hints = join.getHints(); +return visitBiRel(join); +} + +private RelNode visitBiRel(BiRel biRel) { +Hintable hBiRel = (Hintable) biRel; AtomicBoolean changed = new AtomicBoolean(false); List hintsWithCapitalJoinHints = -hints.stream() +hBiRel.getHints().stream() .map( hint -> { String capitalHintName = hint.hintName.toUpperCase(Locale.ROOT); if (JoinStrategy.isJoinStrategy(capitalHintName)) { changed.set(true); +if (JoinStrategy.isLookupHint(hint.hintName)) { Review Comment: please refer to `ClearJoinHintWithCapitalizeJoinHintShuttleTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started
morhidi commented on code in PR #489: URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1058069736 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java: ## @@ -112,18 +116,23 @@ protected Optional getAvailableUpgradeMode( && !flinkVersionChanged( ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) { -if (!flinkService.isHaMetadataAvailable(deployConfig)) { -if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) { -// initial deployment failure, reset to allow for spec change to proceed -return resetOnMissingStableSpec(deployment, deployConfig); -} -} else { +if (flinkService.isHaMetadataAvailable(deployConfig)) { LOG.info( "Job is not running but HA metadata is available for last state restore, ready for upgrade"); return Optional.of(UpgradeMode.LAST_STATE); } } +if (status.getReconciliationStatus() +.deserializeLastReconciledSpec() +.getJob() +.getUpgradeMode() +!= UpgradeMode.LAST_STATE +&& FlinkUtils.jmPodNeverStarted(ctx)) { +deleteJmThatNeverStarted(deployment, deployConfig); +return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig); Review Comment: Can the process stuck here in a recursive loop? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
LadyForest commented on code in PR #21504: URL: https://github.com/apache/flink/pull/21504#discussion_r1058054716 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -417,6 +422,92 @@ void checkColumnExists(String columnName) { } } +private static class ModifySchemaConverter extends SchemaConverter { + +ModifySchemaConverter( +Schema originalSchema, +FlinkTypeFactory typeFactory, +SqlValidator sqlValidator, +Consumer constraintValidator, +Function escapeExpressions, +SchemaResolver schemaResolver) { +super( +originalSchema, +typeFactory, +sqlValidator, +constraintValidator, +escapeExpressions, +schemaResolver); +} + +@Override +void checkColumnExists(String columnName) { +if (!sortedColumnNames.contains(columnName)) { +throw new ValidationException( +String.format( +"%sTry to modify a column `%s` which does not exist in the table.", +EX_MSG_PREFIX, columnName)); +} +} + +@Override +void checkPrimaryKeyExists() { +if (primaryKey == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any primary key constraint. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +void checkWatermarkExists() { +if (watermarkSpec == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any watermark. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +Optional getColumnPosition(SqlTableColumnPosition columnPosition) { +if (columnPosition.isFirstColumn() || columnPosition.isAfterReferencedColumn()) { + sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple()); +return super.getColumnPosition(columnPosition); +} +return Optional.empty(); +} + +@Override +Schema.UnresolvedPhysicalColumn convertPhysicalColumn( +SqlTableColumn.SqlRegularColumn physicalColumn) { +Schema.UnresolvedPhysicalColumn newColumn = super.convertPhysicalColumn(physicalColumn); +String columnName = newColumn.getName(); +// preserves the primary key's nullability +if (primaryKey != null && primaryKey.getColumnNames().contains(columnName)) { +newColumn = +new Schema.UnresolvedPhysicalColumn( +columnName, +newColumn.getDataType().notNull(), +newColumn.getComment().orElse(null)); +} Review Comment: > in the SqlAlterTableSchema#validate we have already mark used column not null. Actually, `SqlAlterTableSchema#validate` is not suitable under this condition. Consider the previous case I mentioned, `SqlAlterTableSchema#validate` will do nothing because this change does not reveal any primary key info. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
fsk119 commented on code in PR #21504: URL: https://github.com/apache/flink/pull/21504#discussion_r1058050529 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -417,6 +422,92 @@ void checkColumnExists(String columnName) { } } +private static class ModifySchemaConverter extends SchemaConverter { + +ModifySchemaConverter( +Schema originalSchema, +FlinkTypeFactory typeFactory, +SqlValidator sqlValidator, +Consumer constraintValidator, +Function escapeExpressions, +SchemaResolver schemaResolver) { +super( +originalSchema, +typeFactory, +sqlValidator, +constraintValidator, +escapeExpressions, +schemaResolver); +} + +@Override +void checkColumnExists(String columnName) { +if (!sortedColumnNames.contains(columnName)) { +throw new ValidationException( +String.format( +"%sTry to modify a column `%s` which does not exist in the table.", +EX_MSG_PREFIX, columnName)); +} +} + +@Override +void checkPrimaryKeyExists() { +if (primaryKey == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any primary key constraint. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +void checkWatermarkExists() { +if (watermarkSpec == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any watermark. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +Optional getColumnPosition(SqlTableColumnPosition columnPosition) { +if (columnPosition.isFirstColumn() || columnPosition.isAfterReferencedColumn()) { + sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple()); +return super.getColumnPosition(columnPosition); +} +return Optional.empty(); +} + +@Override +Schema.UnresolvedPhysicalColumn convertPhysicalColumn( +SqlTableColumn.SqlRegularColumn physicalColumn) { +Schema.UnresolvedPhysicalColumn newColumn = super.convertPhysicalColumn(physicalColumn); +String columnName = newColumn.getName(); +// preserves the primary key's nullability +if (primaryKey != null && primaryKey.getColumnNames().contains(columnName)) { +newColumn = +new Schema.UnresolvedPhysicalColumn( +columnName, +newColumn.getDataType().notNull(), +newColumn.getComment().orElse(null)); +} Review Comment: I mean in the `SchemaConverter#convert`. BTW, do you think we need to modify the type to not null here. As far as I know, in the `SqlAlterTableSchema#validate` we have already mark used column not null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
fsk119 commented on PR #21504: URL: https://github.com/apache/flink/pull/21504#issuecomment-1366360753 +1 to move to another task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on a diff in pull request #19329: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement
fsk119 commented on code in PR #19329: URL: https://github.com/apache/flink/pull/19329#discussion_r1058046923 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java: ## @@ -144,6 +147,163 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable originTable, +ResolvedSchema originResolveSchema) { +Schema originSchema = originTable.getUnresolvedSchema(); Review Comment: The current implementation also modify the pk list if the original column is in the list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30515) Add benchmark for CountVectorizer, Imputer, RobustScaler, UnivariateFeatureSelector and VarianceThresholdSelector
[ https://issues.apache.org/jira/browse/FLINK-30515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30515: --- Labels: pull-request-available (was: ) > Add benchmark for CountVectorizer, Imputer, RobustScaler, > UnivariateFeatureSelector and VarianceThresholdSelector > - > > Key: FLINK-30515 > URL: https://issues.apache.org/jira/browse/FLINK-30515 > Project: Flink > Issue Type: Bug >Reporter: Jiang Xin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] jiangxin369 opened a new pull request, #193: [FLINK-30515] Add benchmark for CountVectorizer, Imputer, RobustScale…
jiangxin369 opened a new pull request, #193: URL: https://github.com/apache/flink-ml/pull/193 …r, UnivariateFeatureSelector and VarianceThresholdSelector ## What is the purpose of the change Add benchmark for CountVectorizer, Imputer, RobustScaler, UnivariateFeatureSelector, and VarianceThresholdSelector. ## Brief change log - Add benchmark for CountVectorizer, Imputer, RobustScaler, UnivariateFeatureSelector, and VarianceThresholdSelector - Add `RandomStringArrayGenerator` to help generate a table of string arrays. - Make `HasNumDistinctValues` a public param. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30515) Add benchmark for CountVectorizer, Imputer, RobustScaler, UnivariateFeatureSelector and VarianceThresholdSelector
Jiang Xin created FLINK-30515: - Summary: Add benchmark for CountVectorizer, Imputer, RobustScaler, UnivariateFeatureSelector and VarianceThresholdSelector Key: FLINK-30515 URL: https://issues.apache.org/jira/browse/FLINK-30515 Project: Flink Issue Type: Bug Reporter: Jiang Xin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30314) Unable to read all records from compressed line-delimited JSON files using Table API
[ https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652361#comment-17652361 ] Jiankun Feng edited comment on FLINK-30314 at 12/28/22 4:05 AM: Hello, everyone. This problem also occurs in 1.14.3 when reading gz files.How is the progress ? Thanks CREATE TABLE MyUserTable ( column_name1 STRING ) WITH ( 'connector'='filesystem', 'path'='xxx.gz', 'format'='raw' ); was (Author: JIRAUSER284851): Hello, everyone. This problem also occurs in 1.14.3 when reading gz files.How is the progress ? Thanks > Unable to read all records from compressed line-delimited JSON files using > Table API > > > Key: FLINK-30314 > URL: https://issues.apache.org/jira/browse/FLINK-30314 > Project: Flink > Issue Type: Improvement > Components: API / Core, Connectors / FileSystem, Table SQL / API >Affects Versions: 1.16.0, 1.15.2 >Reporter: Dmitry Yaraev >Priority: Major > Attachments: input.json, input.json.gz, input.json.zip > > > I am reading gzipped JSON line-delimited files in the batch mode using > [FileSystem > Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/]. > For reading the files a new table is created with the following > configuration: > {code:sql} > CREATE TEMPORARY TABLE `my_database`.`my_table` ( > `my_field1` BIGINT, > `my_field2` INT, > `my_field3` VARCHAR(2147483647) > ) WITH ( > 'connector' = 'filesystem', > 'path' = 'path-to-input-dir', > 'format' = 'json', > 'json.ignore-parse-errors' = 'false', > 'json.fail-on-missing-field' = 'true' > ) {code} > In the input directory I have two files: input-0.json.gz and > input-1.json.gz. As it comes from the filenames, the files are compressed > with GZIP. Each of the files contains 10 records. The issue is that only 2 > records from each file are read (4 in total). If decompressed versions of the > same data files are used, all 20 records are read. > As far as I understand, that problem may be related to the fact that split > length, which is used when the files are read, is in fact the length of a > compressed file. So files are closed before all records are read from them > because read position of the decompressed file stream exceeds split length. > Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we > could identify if the file compressed or not. The flag can be set to true in > {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file > streams. With such a flag it could be possible to differentiate > non-splittable compressed files and only rely on the end of the stream. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30314) Unable to read all records from compressed line-delimited JSON files using Table API
[ https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652361#comment-17652361 ] Jiankun Feng commented on FLINK-30314: -- Hello, everyone. This problem also occurs in 1.14.3 when reading gz files.How is the progress ? Thanks > Unable to read all records from compressed line-delimited JSON files using > Table API > > > Key: FLINK-30314 > URL: https://issues.apache.org/jira/browse/FLINK-30314 > Project: Flink > Issue Type: Improvement > Components: API / Core, Connectors / FileSystem, Table SQL / API >Affects Versions: 1.16.0, 1.15.2 >Reporter: Dmitry Yaraev >Priority: Major > Attachments: input.json, input.json.gz, input.json.zip > > > I am reading gzipped JSON line-delimited files in the batch mode using > [FileSystem > Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/]. > For reading the files a new table is created with the following > configuration: > {code:sql} > CREATE TEMPORARY TABLE `my_database`.`my_table` ( > `my_field1` BIGINT, > `my_field2` INT, > `my_field3` VARCHAR(2147483647) > ) WITH ( > 'connector' = 'filesystem', > 'path' = 'path-to-input-dir', > 'format' = 'json', > 'json.ignore-parse-errors' = 'false', > 'json.fail-on-missing-field' = 'true' > ) {code} > In the input directory I have two files: input-0.json.gz and > input-1.json.gz. As it comes from the filenames, the files are compressed > with GZIP. Each of the files contains 10 records. The issue is that only 2 > records from each file are read (4 in total). If decompressed versions of the > same data files are used, all 20 records are read. > As far as I understand, that problem may be related to the fact that split > length, which is used when the files are read, is in fact the length of a > compressed file. So files are closed before all records are read from them > because read position of the decompressed file stream exceeds split length. > Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we > could identify if the file compressed or not. The flag can be set to true in > {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file > streams. With such a flag it could be possible to differentiate > non-splittable compressed files and only rely on the end of the stream. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lincoln-lil commented on pull request #20800: [FLINK-28850][table-planner] Support table alias in LOOKUP hint
lincoln-lil commented on PR #20800: URL: https://github.com/apache/flink/pull/20800#issuecomment-1366346357 @godfreyhe thanks for reviewing this! I've updated the pr according to your comments, the Snapshot was formatted correctly in the first commit and added two more cases that use table name as hint option(previously, only one case was kept) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
lsyldliu commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058031265 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.call; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType argsType; +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression operand; +// TimestampToNumericCastRule can't cast timestamp to numeric directly, so here use +// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead +if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) { Review Comment: I agree with you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] beyond1920 commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
beyond1920 commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058030752 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.call; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType argsType; +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression operand; +// TimestampToNumericCastRule can't cast timestamp to numeric directly, so here use +// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead +if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) { Review Comment: @lsyldliu Thanks for pointing the problem out. IMO, it's ok to not support summing over timestamp types. Intuitively, sum(timestamp) does not seem to make sense. Besides the behavior of flink hive dialect is not easy to be consistent with hive behavior. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm
swuferhong commented on code in PR #21530: URL: https://github.com/apache/flink/pull/21530#discussion_r1058027477 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkBusyJoinReorderRule.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.cost.FlinkCost; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.rules.LoptMultiJoin; +import org.apache.calcite.rel.rules.LoptOptimizeJoinRule; +import org.apache.calcite.rel.rules.MultiJoin; +import org.apache.calcite.rel.rules.TransformationRule; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +/** + * Flink busy join reorder rule, which will convert {@link MultiJoin} to a busy join tree. + * + * In this busy join reorder strategy, we will first try to reorder all the inner join type + * inputs in the multiJoin, and then add all outer join type inputs on the top. + * + * First, reordering all the inner join type inputs in the multiJoin. We adopt the concept of + * level in dynamic programming, and the latter layer will use the results stored in the previous + * layers. First, we put all inputs (each input in {@link MultiJoin}) into level 0, then we build + * all two-inputs join at level 1 based on the {@link FlinkCost} of level 0, then we will build + * three-inputs join based on the previous two levels, then four-inputs joins ... etc, util we + * reorder all the inner join type inputs in the multiJoin. When building m-inputs join, we only + * keep the best plan (have the lowest {@link FlinkCost}) for the same set of m inputs. E.g., for + * three-inputs join, we keep only the best plan for inputs {A, B, C} among plans (A J B) J C, (A J + * C) J B, (B J C) J A. + * + * Second, we will add all outer join type inputs in the MultiJoin on the top. + */ +public class FlinkBusyJoinReorderRule extends RelRule +implements TransformationRule { + +public static final LoptOptimizeJoinRule MULTI_JOIN_OPTIMIZE = +LoptOptimizeJoinRule.Config.DEFAULT.toRule(); + +/** Creates an SparkJoinReorderRule. */ +protected FlinkBusyJoinReorderRule(Config config) { +super(config); +} + +@Deprecated // to be removed before 2.0 +public FlinkBusyJoinReorderRule(RelBuilderFactory relBuilderFactory) { + this(Config.DEFAULT.withRelBuilderFactory(relBuilderFactory).as(Config.class)); +} + +@Deprecated // to be removed before 2.0 +public FlinkBusyJoinReorderRule( +RelFactories.JoinFactory joinFactory, +RelFactories.ProjectFactory projectFactory, +RelFactories.FilterFactory filterFactory) { +this(RelBuilder.proto(joinFactory, projectFactory, filterFactory)); +
[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm
swuferhong commented on code in PR #21530: URL: https://github.com/apache/flink/pull/21530#discussion_r1058025322 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkBusyJoinReorderRule.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.cost.FlinkCost; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.rules.LoptMultiJoin; +import org.apache.calcite.rel.rules.LoptOptimizeJoinRule; +import org.apache.calcite.rel.rules.MultiJoin; +import org.apache.calcite.rel.rules.TransformationRule; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +/** + * Flink busy join reorder rule, which will convert {@link MultiJoin} to a busy join tree. + * + * In this busy join reorder strategy, we will first try to reorder all the inner join type + * inputs in the multiJoin, and then add all outer join type inputs on the top. + * + * First, reordering all the inner join type inputs in the multiJoin. We adopt the concept of + * level in dynamic programming, and the latter layer will use the results stored in the previous + * layers. First, we put all inputs (each input in {@link MultiJoin}) into level 0, then we build + * all two-inputs join at level 1 based on the {@link FlinkCost} of level 0, then we will build + * three-inputs join based on the previous two levels, then four-inputs joins ... etc, util we + * reorder all the inner join type inputs in the multiJoin. When building m-inputs join, we only + * keep the best plan (have the lowest {@link FlinkCost}) for the same set of m inputs. E.g., for + * three-inputs join, we keep only the best plan for inputs {A, B, C} among plans (A J B) J C, (A J + * C) J B, (B J C) J A. + * + * Second, we will add all outer join type inputs in the MultiJoin on the top. + */ +public class FlinkBusyJoinReorderRule extends RelRule +implements TransformationRule { + +public static final LoptOptimizeJoinRule MULTI_JOIN_OPTIMIZE = +LoptOptimizeJoinRule.Config.DEFAULT.toRule(); + +/** Creates an SparkJoinReorderRule. */ +protected FlinkBusyJoinReorderRule(Config config) { +super(config); +} + +@Deprecated // to be removed before 2.0 +public FlinkBusyJoinReorderRule(RelBuilderFactory relBuilderFactory) { + this(Config.DEFAULT.withRelBuilderFactory(relBuilderFactory).as(Config.class)); +} + +@Deprecated // to be removed before 2.0 +public FlinkBusyJoinReorderRule( +RelFactories.JoinFactory joinFactory, +RelFactories.ProjectFactory projectFactory, +RelFactories.FilterFactory filterFactory) { +this(RelBuilder.proto(joinFactory, projectFactory, filterFactory)); +
[GitHub] [flink-table-store] tsreaper merged pull request #445: [FLINK-30506] Add documentation for writing Table Store with Spark3
tsreaper merged PR #445: URL: https://github.com/apache/flink-table-store/pull/445 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm
swuferhong commented on code in PR #21530: URL: https://github.com/apache/flink/pull/21530#discussion_r1058020032 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java: ## @@ -135,6 +135,24 @@ public class OptimizerConfigOptions { .defaultValue(false) .withDescription("Enables join reorder in optimizer. Default is disabled."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) +public static final ConfigOption TABLE_OPTIMIZER_BUSY_JOIN_REORDER = +key("table.optimizer.busy-join-reorder") +.booleanType() +.defaultValue(false) +.withDescription( +"Enables busy join reorder in optimizer. Default is disabled."); + +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) +public static final ConfigOption TABLE_OPTIMIZER_BUSY_JOIN_REORDER_DP_THRESHOLD = +key("table.optimizer.busy-join-reorder-dp-threshold") +.intType() +.defaultValue(12) Review Comment: > can you explain why we choose `12` as the threshold value ? New we set the default value to -1, which means that Flink disable busy join reorder by default. 12 is an empirical parameter provided by the designer of the algorithm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm
swuferhong commented on code in PR #21530: URL: https://github.com/apache/flink/pull/21530#discussion_r1058019619 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkBusyJoinReorderRule.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.cost.FlinkCost; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.rules.LoptMultiJoin; +import org.apache.calcite.rel.rules.LoptOptimizeJoinRule; +import org.apache.calcite.rel.rules.MultiJoin; +import org.apache.calcite.rel.rules.TransformationRule; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +/** + * Flink busy join reorder rule, which will convert {@link MultiJoin} to a busy join tree. + * + * In this busy join reorder strategy, we will first try to reorder all the inner join type + * inputs in the multiJoin, and then add all outer join type inputs on the top. + * + * First, reordering all the inner join type inputs in the multiJoin. We adopt the concept of + * level in dynamic programming, and the latter layer will use the results stored in the previous + * layers. First, we put all inputs (each input in {@link MultiJoin}) into level 0, then we build + * all two-inputs join at level 1 based on the {@link FlinkCost} of level 0, then we will build + * three-inputs join based on the previous two levels, then four-inputs joins ... etc, util we + * reorder all the inner join type inputs in the multiJoin. When building m-inputs join, we only + * keep the best plan (have the lowest {@link FlinkCost}) for the same set of m inputs. E.g., for + * three-inputs join, we keep only the best plan for inputs {A, B, C} among plans (A J B) J C, (A J + * C) J B, (B J C) J A. + * + * Second, we will add all outer join type inputs in the MultiJoin on the top. + */ +public class FlinkBusyJoinReorderRule extends RelRule +implements TransformationRule { + +public static final LoptOptimizeJoinRule MULTI_JOIN_OPTIMIZE = +LoptOptimizeJoinRule.Config.DEFAULT.toRule(); + +/** Creates an SparkJoinReorderRule. */ +protected FlinkBusyJoinReorderRule(Config config) { +super(config); +} + +@Deprecated // to be removed before 2.0 +public FlinkBusyJoinReorderRule(RelBuilderFactory relBuilderFactory) { + this(Config.DEFAULT.withRelBuilderFactory(relBuilderFactory).as(Config.class)); +} + +@Deprecated // to be removed before 2.0 +public FlinkBusyJoinReorderRule( +RelFactories.JoinFactory joinFactory, +RelFactories.ProjectFactory projectFactory, +RelFactories.FilterFactory filterFactory) { +this(RelBuilder.proto(joinFactory, projectFactory, filterFactory)); +
[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm
swuferhong commented on code in PR #21530: URL: https://github.com/apache/flink/pull/21530#discussion_r1058019236 ## docs/layouts/shortcodes/generated/optimizer_config_configuration.html: ## @@ -89,5 +89,17 @@ Boolean When it is true, the optimizer will collect and use the statistics from source connectors if the source extends from SupportsStatisticReport and the statistics from catalog is UNKNOWN.Default value is true. + +table.optimizer.busy-join-reorder Batch Streaming Review Comment: > This option can be removed, since we can use `table.optimizer.busy-join-reorder-dp-threshold` <= 0 to disable busy-join-reorder Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
lsyldliu commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058018678 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.call; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType argsType; +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression operand; +// TimestampToNumericCastRule can't cast timestamp to numeric directly, so here use +// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead +if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) { Review Comment: BTW, the hive [TimestampObjectInspector](https://github.com/apache/hive/blob/989e72a393356c5f91f96d1bab6455a4c75c77a7/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java#L874) support return double precision, but flink `UNIX_TIMESTAMP ` only can return bigint, so there may be a loss of precision here, for example `2021-08-04 16:26:33.4`, flink only can get the part of `2021-08-04 16:26:33`, the `.4` is loss. For timestamp type, we cannot align the behavior with hive, what do you think about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
lsyldliu commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058014713 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.call; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType argsType; +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression operand; +// TimestampToNumericCastRule can't cast timestamp to numeric directly, so here use +// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead +if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) { Review Comment: Yes,[sum](https://github.com/apache/hive/blob/10805bc997d7cd136b85fca9200cf165ffe2eae5/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java#L123) udaf supports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
lsyldliu commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058014428 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.call; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType argsType; +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression operand; +// TimestampToNumericCastRule can't cast timestamp to numeric directly, so here use +// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead +if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) { +operand = castTimestampToLong(operand(0)); +} else { +operand = operand(0); +} +Expression tryCastOperand = tryCast(operand, typeLiteral(getResultType())); +return new Expression[] { +/* sum = */ ifThenElse( +isNull(operand(0)), +sum, +ifThenElse( +isNull(tryCastOperand), +sum, +ifThenElse( +isNull(sum), +tryCastOperand, +adjustedPlus(sum, tryCastOperand +}; +} + +@Override +public Expression[] retractExpressions() { +throw new TableException("Sum aggregate function does not support retraction."); +} + +@Override +public Expression[] mergeExpressions() { +return new
[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation
lsyldliu commented on code in PR #21401: URL: https://github.com/apache/flink/pull/21401#discussion_r1058014011 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.call; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive sum aggregate function. */ +public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private DataType argsType; +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ nullOf(getResultType())}; +} + +@Override +public Expression[] accumulateExpressions() { +Expression operand; +// TimestampToNumericCastRule can't cast timestamp to numeric directly, so here use +// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead +if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) { +operand = castTimestampToLong(operand(0)); +} else { +operand = operand(0); +} +Expression tryCastOperand = tryCast(operand, typeLiteral(getResultType())); +return new Expression[] { +/* sum = */ ifThenElse( +isNull(operand(0)), +sum, +ifThenElse( +isNull(tryCastOperand), +sum, +ifThenElse( +isNull(sum), +tryCastOperand, +adjustedPlus(sum, tryCastOperand +}; +} + +@Override +public Expression[] retractExpressions() { Review Comment: We just follow hive logic that doesn't support retraction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [flink-kubernetes-operator] nowke commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec
nowke commented on code in PR #487: URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1057990470 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java: ## @@ -241,15 +245,35 @@ private Optional validateJobSpec( } private Optional validateJmSpec(JobManagerSpec jmSpec, Map confMap) { +Configuration conf = Configuration.fromMap(confMap); +var jmMemoryDefined = +jmSpec != null +&& jmSpec.getResource() != null +&& !StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory()); +Optional jmMemoryValidation = +jmMemoryDefined ? Optional.empty() : validateJmMemoryConfig(conf); + if (jmSpec == null) { -return Optional.empty(); +return jmMemoryValidation; } return firstPresent( +jmMemoryValidation, validateResources("JobManager", jmSpec.getResource()), validateJmReplicas(jmSpec.getReplicas(), confMap)); } +private Optional validateJmMemoryConfig(Configuration conf) { +try { + JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( Review Comment: - `processSpecFromConfigWithNewOptionToInterpretLegacyHeap` internally calls `processSpecFromConfig` - [JobManagerProcessUtils.java#L73-L74](https://github.com/apache/flink/blob/75a92efd7b35501698e5de253e5231d680830c16/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java#L73-L74) - Also, `processSpecFromConfig` is not `public`, but package private - [JobManagerProcessUtils.java#L82](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java#L82) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
reta commented on PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#issuecomment-1366207021 @ypark2103 that is correct, the Flink Opensearch Connector was following the Flink's externalization model for connectors, the necessary scaffolding is only available in Flink 1.16 and above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] ypark2103 commented on pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
ypark2103 commented on PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#issuecomment-1366195952 @reta @MartijnVisser Is this Flink-Connector-Opensearch only available for flink 1.16 version? What about flink 1.11 or 1.12? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #8: [FLINK-14102] Introduce DB2Dialect.
eskabetxe commented on code in PR #8: URL: https://github.com/apache/flink-connector-jdbc/pull/8#discussion_r1057924897 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/db2/Db2TestBaseITCase.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.dialect.db2; + +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Db2Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** Basic class for testing DB2 jdbc. */ +public class Db2TestBaseITCase extends AbstractTestBase { Review Comment: you could annotate the class with '@ Testcontainer' and the container with '@ Container'.. with this you could eliminate the methods startContainers and stopContainers as it will be auto managed by testcontainers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #8: [FLINK-14102] Introduce DB2Dialect.
eskabetxe commented on code in PR #8: URL: https://github.com/apache/flink-connector-jdbc/pull/8#discussion_r1057922321 ## flink-connector-jdbc/pom.xml: ## @@ -195,13 +202,13 @@ under the License. test - - + - org.apache.flink Review Comment: Why we are removing this? ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/db2/Db2TestBaseITCase.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.dialect.db2; + +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Db2Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** Basic class for testing DB2 jdbc. */ +public class Db2TestBaseITCase extends AbstractTestBase { Review Comment: you could annotate the class with '@Testcontainer' and the container with '@Container'.. with this you could eliminate the methods startContainers and stopContainers as it will be auto managed by testcontainers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuzelin commented on pull request #21525: [FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway
yuzelin commented on PR #21525: URL: https://github.com/apache/flink/pull/21525#issuecomment-1366075740 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21564: [FLINK-30514] HybridSource savepoint recovery sequence fix
flinkbot commented on PR #21564: URL: https://github.com/apache/flink/pull/21564#issuecomment-1366067992 ## CI report: * b68145ccb94f51f4ef510cb2246b6c423a064347 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30514) HybridSource savepoint recovery sequence
[ https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30514: --- Labels: pull-request-available (was: ) > HybridSource savepoint recovery sequence > > > Key: FLINK-30514 > URL: https://issues.apache.org/jira/browse/FLINK-30514 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.0, 1.15.2, 1.15.3 >Reporter: Denis Golovachev >Priority: Major > Labels: pull-request-available > > {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} > accumulates splits during recovery in > {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}. > As a next step it creates a reader and pushes all {{recoveredSplits to}} the > reader. > {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}} > Instantiation sequence of the {{setCurrentReader}} is following > - {{reader.start()}} > - {{reader.addSplits()}} > Seems like it doesn't work if we use {{FileSourceReader}} as an underlying > reader. > {{FileSourceReader#start()}} method checks if any splits are available to > read and executes {{sendSplitRequest}} if empty. Current > {{HybridSourceReader}} recovery sequence is not aligned with this. > So, every time we recover we immediately jump to the next splits. > Let me show you some logs. In this experiment job should have started with > files inside the 100 bucket but jumped to the bucket number 200 > Job Manager > {code:java} > 2022-12-27 13:38:47.155 StaticFileSplitEnumerator - Assigned split to > subtask 1 : FileSourceSplit: > s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97489087) hosts=[localhost] ID=32 position=null > 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to > subtask 9 : FileSourceSplit: > s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97342071) hosts=[localhost] ID=33 position=null > 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to > subtask 6 : FileSourceSplit: > s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97377047) hosts=[localhost] ID=31 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 5 : FileSourceSplit: > s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97406878) hosts=[localhost] ID=34 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 2 : FileSourceSplit: > s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97536205) hosts=[localhost] ID=40 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 4 : FileSourceSplit: > s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97420601) hosts=[localhost] ID=35 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 8 : FileSourceSplit: > s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97472256) hosts=[localhost] ID=36 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 3 : FileSourceSplit: > s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97495880) hosts=[localhost] ID=37 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 0 : FileSourceSplit: > s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97389425) hosts=[localhost] ID=38 position=null > 2022-12-27 13:38:47.158 StaticFileSplitEnumerator - Assigned split to > subtask 7 : FileSourceSplit: > s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97428709) hosts=[localhost] ID=39 position=null > {code} > Task Manager > {code:java} > 2246:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79887236) hosts=[localhost] ID=18 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029] > 2247:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79987191) hosts=[localhost] ID=11 > position=CheckpointedPosition: offset=NO_OFFSET,
[GitHub] [flink] WonderBeat opened a new pull request, #21564: [FLINK-30514] HybridSource savepoint recovery sequence fix
WonderBeat opened a new pull request, #21564: URL: https://github.com/apache/flink/pull/21564 ## What is the purpose of the change This PR tweaks `HybridSourceReader` underlying reader initialization sequence to make it compatible with `FileSourceReader`. ## Brief change log - Underlying reader instantiation sequence changed from `start -> addSplits` to `addSplits -> start` ## Verifying this change This change added tests and can be verified as follows: - Added test that validates call ordering during initialization ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30514) HybridSource savepoint recovery sequence
[ https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Denis Golovachev updated FLINK-30514: - Description: {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} accumulates splits during recovery in {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}. As a next step it creates a reader and pushes all {{recoveredSplits to}} the reader. {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}} Instantiation sequence of the {{setCurrentReader}} is following - {{reader.start()}} - {{reader.addSplits()}} Seems like it doesn't work if we use {{FileSourceReader}} as an underlying reader. {{FileSourceReader#start()}} method checks if any splits are available to read and executes {{sendSplitRequest}} if empty. Current {{HybridSourceReader}} recovery sequence is not aligned with this. So, every time we recover we immediately jump to the next splits. Let me show you some logs. In this experiment job should have started with files inside the 100 bucket but jumped to the bucket number 200 Job Manager {code:java} 2022-12-27 13:38:47.155 StaticFileSplitEnumerator - Assigned split to subtask 1 : FileSourceSplit: s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087) hosts=[localhost] ID=32 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 9 : FileSourceSplit: s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071) hosts=[localhost] ID=33 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 6 : FileSourceSplit: s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047) hosts=[localhost] ID=31 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 5 : FileSourceSplit: s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878) hosts=[localhost] ID=34 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 2 : FileSourceSplit: s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205) hosts=[localhost] ID=40 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 4 : FileSourceSplit: s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601) hosts=[localhost] ID=35 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 8 : FileSourceSplit: s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256) hosts=[localhost] ID=36 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 3 : FileSourceSplit: s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880) hosts=[localhost] ID=37 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 0 : FileSourceSplit: s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425) hosts=[localhost] ID=38 position=null 2022-12-27 13:38:47.158 StaticFileSplitEnumerator - Assigned split to subtask 7 : FileSourceSplit: s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709) hosts=[localhost] ID=39 position=null {code} Task Manager {code:java} 2246:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236) hosts=[localhost] ID=18 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029] 2247:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191) hosts=[localhost] ID=11 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030] 2248:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-9-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830) hosts=[localhost] ID=20 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535] 2249:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-4-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663) hosts=[localhost] ID=15 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]
[jira] [Updated] (FLINK-30514) HybridSource savepoint recovery sequence
[ https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Denis Golovachev updated FLINK-30514: - Description: {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} accumulates splits during recovery in {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}. As a next step it creates a reader and pushes all {{recoveredSplits to}} the reader. {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}} Instantiation sequence of the {{setCurrentReader}} is following - {{reader.start()}} - {{reader.addSplits()}} Seems like it doesn't work if we use {{FileSourceReader}} as an underlying reader. {{FileSourceReader#start()}} method checks if any splits are available to read and executes {{sendSplitRequest}} if empty. Current {{HybridSourceReader}} recovery sequence is not aligned with this. So, every time we recover we immediately jump to the next splits. Let me show you some logs. In this experiment job should have started with files inside the 100 bucket but jumped to the bucket number 200 Job Manager {code:java} 2022-12-27 13:38:47.155 StaticFileSplitEnumerator - Assigned split to subtask 1 : FileSourceSplit: s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087) hosts=[localhost] ID=32 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 9 : FileSourceSplit: s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071) hosts=[localhost] ID=33 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 6 : FileSourceSplit: s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047) hosts=[localhost] ID=31 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 5 : FileSourceSplit: s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878) hosts=[localhost] ID=34 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 2 : FileSourceSplit: s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205) hosts=[localhost] ID=40 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 4 : FileSourceSplit: s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601) hosts=[localhost] ID=35 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 8 : FileSourceSplit: s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256) hosts=[localhost] ID=36 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 3 : FileSourceSplit: s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880) hosts=[localhost] ID=37 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 0 : FileSourceSplit: s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425) hosts=[localhost] ID=38 position=null 2022-12-27 13:38:47.158 StaticFileSplitEnumerator - Assigned split to subtask 7 : FileSourceSplit: s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709) hosts=[localhost] ID=39 position=null {code} Task Manager {code:java} 2246:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236) hosts=[localhost] ID=18 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029] 2247:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191) hosts=[localhost] ID=11 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030] 2248:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-9-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830) hosts=[localhost] ID=20 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535] 2249:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-4-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663) hosts=[localhost] ID=15 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]
[jira] [Created] (FLINK-30514) HybridSource savepoint recovery sequence
Denis Golovachev created FLINK-30514: Summary: HybridSource savepoint recovery sequence Key: FLINK-30514 URL: https://issues.apache.org/jira/browse/FLINK-30514 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.15.3, 1.15.2, 1.16.0 Reporter: Denis Golovachev {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} accumulates splits during recovery in {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}. As a next step it creates a reader and pushes all {{recoveredSplits}} the reader. {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}} Instantiation sequence of the {{setCurrentReader}} is following - {{reader.start()}} - {{reader.addSplits()}} Seems like it doesn't work if we use {{FileSourceReader}} as an underlying reader. {{FileSourceReader#start()}} method checks if any splits are available to read and executes {{sendSplitRequest}} if empty. Current {{HybridSourceReader}} recovery sequence is not aligned with this. So, every time we recover we immediately jump to the next splits. Let me show you some logs. In this experiment job should have started with files inside the 100 bucket but jumped to the bucket number 200 Job Manager {code:java} 2022-12-27 13:38:47.155 StaticFileSplitEnumerator - Assigned split to subtask 1 : FileSourceSplit: s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087) hosts=[localhost] ID=32 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 9 : FileSourceSplit: s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071) hosts=[localhost] ID=33 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 6 : FileSourceSplit: s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047) hosts=[localhost] ID=31 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 5 : FileSourceSplit: s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878) hosts=[localhost] ID=34 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 2 : FileSourceSplit: s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205) hosts=[localhost] ID=40 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 4 : FileSourceSplit: s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601) hosts=[localhost] ID=35 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 8 : FileSourceSplit: s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256) hosts=[localhost] ID=36 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 3 : FileSourceSplit: s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880) hosts=[localhost] ID=37 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 0 : FileSourceSplit: s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425) hosts=[localhost] ID=38 position=null 2022-12-27 13:38:47.158 StaticFileSplitEnumerator - Assigned split to subtask 7 : FileSourceSplit: s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709) hosts=[localhost] ID=39 position=null {code} Task Manager {code:java} 2246:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236) hosts=[localhost] ID=18 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029] 2247:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191) hosts=[localhost] ID=11 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030] 2248:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/100/part-9-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830) hosts=[localhost] ID=20 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535] 2249:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit:
[GitHub] [flink] LadyForest commented on pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
LadyForest commented on PR #21504: URL: https://github.com/apache/flink/pull/21504#issuecomment-1366041944 > BTW, I think we should also take `OperationConverterUtils.convertChangeColumn` into consideration. Will you help to clean the code about this? Sure, I'd like to. However, it is prudent for us to put it in a separate task to complete. The reason is that the current impl uses the deprecated `CatalogTableImpl` while `SchemaConverter` uses `CatalogTable.of`. I'm not sure if there any compatibility issues exist. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
LadyForest commented on code in PR #21504: URL: https://github.com/apache/flink/pull/21504#discussion_r1057779254 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -417,6 +422,92 @@ void checkColumnExists(String columnName) { } } +private static class ModifySchemaConverter extends SchemaConverter { + +ModifySchemaConverter( +Schema originalSchema, +FlinkTypeFactory typeFactory, +SqlValidator sqlValidator, +Consumer constraintValidator, +Function escapeExpressions, +SchemaResolver schemaResolver) { +super( +originalSchema, +typeFactory, +sqlValidator, +constraintValidator, +escapeExpressions, +schemaResolver); +} + +@Override +void checkColumnExists(String columnName) { +if (!sortedColumnNames.contains(columnName)) { +throw new ValidationException( +String.format( +"%sTry to modify a column `%s` which does not exist in the table.", +EX_MSG_PREFIX, columnName)); +} +} + +@Override +void checkPrimaryKeyExists() { +if (primaryKey == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any primary key constraint. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +void checkWatermarkExists() { +if (watermarkSpec == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any watermark. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +Optional getColumnPosition(SqlTableColumnPosition columnPosition) { +if (columnPosition.isFirstColumn() || columnPosition.isAfterReferencedColumn()) { + sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple()); +return super.getColumnPosition(columnPosition); +} +return Optional.empty(); +} + +@Override +Schema.UnresolvedPhysicalColumn convertPhysicalColumn( +SqlTableColumn.SqlRegularColumn physicalColumn) { +Schema.UnresolvedPhysicalColumn newColumn = super.convertPhysicalColumn(physicalColumn); +String columnName = newColumn.getName(); +// preserves the primary key's nullability +if (primaryKey != null && primaryKey.getColumnNames().contains(columnName)) { +newColumn = +new Schema.UnresolvedPhysicalColumn( +columnName, +newColumn.getDataType().notNull(), +newColumn.getComment().orElse(null)); +} Review Comment: > Why not mark the column is not null when building the final schema with the final pk? Because under some conditions, the final pk does not get a chance to be updated. Consider the following case ```sql create table T ( f0 int, f1 string primary key not enforced, -- f1 is converted to string not null implicitly f2 double ) with ( ... ); -- then change f1 pos alter table T modify f1 string first ``` For this case, the schema change only contains a column modification, and L#103`alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);` does not get executed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB
[ https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652279#comment-17652279 ] Krzysztof Chmielewski commented on FLINK-27246: --- Hi [~TsReaper] Thanks for the feedback. Regarding IfStatementRewriter its a little bit tricky for me. I think my new thing might handle more cases than IfStatementRewriter did. Plus fStatementRewriter and existing rewrites seems to expect a method declaration + body, where my BlocksStatementGrouper and Splitter are processing individual block statements. They are called from FunctionSplitter::FunctionSplitVisitor where while processing block statements from method's body. Now It seems that after my change, FunctionSplitter is also rewriting the code, similar to IfStatementRewriter and maybe this is not the best thing to do from the clean code/architecture perspective. The problem with IfStatementRewriter is that it will not rewrite the If/Else branch if the branch contains "while" statement in it or when entire if/else statement is inside while statement, which was the original problem. So now I'm wonder should we have this extracted from FunctionSplitter into new BlocksStatementRewriter that whill handle while/If/else statements in combination or can this be inside Function Splitter as it is now. > Code of method > "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" > of class "HashAggregateWithKeys$9211" grows beyond 64 KB > - > > Key: FLINK-27246 > URL: https://issues.apache.org/jira/browse/FLINK-27246 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3, 1.16.0, 1.15.3 >Reporter: Maciej Bryński >Assignee: Krzysztof Chmielewski >Priority: Major > Attachments: endInput_falseFilter9123_split9704.txt > > > I think this bug should get fixed in > https://issues.apache.org/jira/browse/FLINK-23007 > Unfortunately I spotted it on Flink 1.14.3 > {code} > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$9211' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:102) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:83) > ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1] > ... 11 more > Caused by: > org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: >
[GitHub] [flink] LadyForest commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
LadyForest commented on code in PR #21504: URL: https://github.com/apache/flink/pull/21504#discussion_r1057779254 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -417,6 +422,92 @@ void checkColumnExists(String columnName) { } } +private static class ModifySchemaConverter extends SchemaConverter { + +ModifySchemaConverter( +Schema originalSchema, +FlinkTypeFactory typeFactory, +SqlValidator sqlValidator, +Consumer constraintValidator, +Function escapeExpressions, +SchemaResolver schemaResolver) { +super( +originalSchema, +typeFactory, +sqlValidator, +constraintValidator, +escapeExpressions, +schemaResolver); +} + +@Override +void checkColumnExists(String columnName) { +if (!sortedColumnNames.contains(columnName)) { +throw new ValidationException( +String.format( +"%sTry to modify a column `%s` which does not exist in the table.", +EX_MSG_PREFIX, columnName)); +} +} + +@Override +void checkPrimaryKeyExists() { +if (primaryKey == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any primary key constraint. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +void checkWatermarkExists() { +if (watermarkSpec == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any watermark. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +Optional getColumnPosition(SqlTableColumnPosition columnPosition) { +if (columnPosition.isFirstColumn() || columnPosition.isAfterReferencedColumn()) { + sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple()); +return super.getColumnPosition(columnPosition); +} +return Optional.empty(); +} + +@Override +Schema.UnresolvedPhysicalColumn convertPhysicalColumn( +SqlTableColumn.SqlRegularColumn physicalColumn) { +Schema.UnresolvedPhysicalColumn newColumn = super.convertPhysicalColumn(physicalColumn); +String columnName = newColumn.getName(); +// preserves the primary key's nullability +if (primaryKey != null && primaryKey.getColumnNames().contains(columnName)) { +newColumn = +new Schema.UnresolvedPhysicalColumn( +columnName, +newColumn.getDataType().notNull(), +newColumn.getComment().orElse(null)); +} Review Comment: > Why not mark the column is not null when building the final schema with the final pk? Because under some conditions the final pk does not get a chance to be updated. Consider the following case ```sql create table T ( f0 int, f1 string primary key not enforced, -- f1 is converted to string not null implicitly f2 double ) with ( ... ); -- then change f1 pos alter table T modify f1 string first ``` For this case, L#103`alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);` does not get executed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gyfora commented on pull request #21515: [FLINK-30429][client] Fix IllegalArgumentException when no argument in flink executable
gyfora commented on PR #21515: URL: https://github.com/apache/flink/pull/21515#issuecomment-1366000815 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30456) OLM Bundle Description Version Problems
[ https://issues.apache.org/jira/browse/FLINK-30456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-30456. -- Fix Version/s: kubernetes-operator-1.4.0 Resolution: Fixed merged to main 0009746cb3bf96bec0450e99e03c1af20f4038e9 > OLM Bundle Description Version Problems > --- > > Key: FLINK-30456 > URL: https://issues.apache.org/jira/browse/FLINK-30456 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.3.0 >Reporter: James Busche >Assignee: James Busche >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.4.0 > > Attachments: image-2022-12-20-08-21-02-597.png > > > OLM is working great with OperatorHub, but noticed a few items that need > fixing. > 1. The basic.yaml example version is release-1.1 instead of the latest > release (release-1.3). This needs fixing in two places: > tools/olm/generate-olm-bundle.sh > tools/olm/docker-entry.sh > 2. The label versions in the description are hardcoded to 1.2.0 instead of > the latest release (1.3.0) > 3. The Provider is blank space " " but soon needs to have some text in there > to avoid CI errors with the latest operator-sdk versions. The person who > noticed it recommended "Community" for now, but maybe we can being making it > "The Apache Software Foundation" now? Not sure if we're ready for that yet > or not... > > I'm working on a PR to address these items. Can you assign the issue to me? > Thanks! > fyi [~tedhtchang] [~gyfora] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description
gyfora merged PR #491: URL: https://github.com/apache/flink-kubernetes-operator/pull/491 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec
gyfora commented on code in PR #487: URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1057748254 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java: ## @@ -241,15 +245,35 @@ private Optional validateJobSpec( } private Optional validateJmSpec(JobManagerSpec jmSpec, Map confMap) { +Configuration conf = Configuration.fromMap(confMap); +var jmMemoryDefined = +jmSpec != null +&& jmSpec.getResource() != null +&& !StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory()); +Optional jmMemoryValidation = +jmMemoryDefined ? Optional.empty() : validateJmMemoryConfig(conf); + if (jmSpec == null) { -return Optional.empty(); +return jmMemoryValidation; } return firstPresent( +jmMemoryValidation, validateResources("JobManager", jmSpec.getResource()), validateJmReplicas(jmSpec.getReplicas(), confMap)); } +private Optional validateJmMemoryConfig(Configuration conf) { +try { + JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( Review Comment: Why do we use this method vs using `processSpecFromConfig`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27571) Recognize "less is better" benchmarks in regression detection script
[ https://issues.apache.org/jira/browse/FLINK-27571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan reassigned FLINK-27571: - Assignee: Yanfei Lei (was: Roman Khachatryan) > Recognize "less is better" benchmarks in regression detection script > > > Key: FLINK-27571 > URL: https://issues.apache.org/jira/browse/FLINK-27571 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.16.0 >Reporter: Roman Khachatryan >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Attachments: Screenshot_2022-05-09_10-33-11.png > > > Example benchmark: > [http://codespeed.dak8s.net:8000/timeline/#/?exe=5=schedulingDownstreamTasks.BATCH=on=on=off=2=200] > > [Proposed > solution|https://issues.apache.org/jira/browse/FLINK-27555?focusedCommentId=17534423=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17534423]: > {quote} > I think #2 is the correct way. > Maybe we can modify the save_jmh_result.py to correctly set the 'units' and > the 'lessisbetter' fields of benchmark results. The 'units' is already > contained in the jmh result and the 'lessisbetter' can be derived from the > mode(false if it is 'thrpt' mode, otherwise true). An example of the jmh > result format can be found at https://i.stack.imgur.com/vB3fV.png. > This can fix the web UI as well as the REST result, and then the > regression_report.py will be able to identify which benchmarks are "less is > better" and treat them differently. > {quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 commented on pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
fsk119 commented on PR #21504: URL: https://github.com/apache/flink/pull/21504#issuecomment-1365913657 BTW, I think we should also take `OperationConverterUtils.convertChangeColumn` into consideration. Will you help to clean the code about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30513) HA storage dir leaks on cluster termination
[ https://issues.apache.org/jira/browse/FLINK-30513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanghao Chen updated FLINK-30513: -- Issue Type: Bug (was: Improvement) > HA storage dir leaks on cluster termination > > > Key: FLINK-30513 > URL: https://issues.apache.org/jira/browse/FLINK-30513 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.15.0, 1.16.0 >Reporter: Zhanghao Chen >Priority: Major > Attachments: image-2022-12-27-21-32-17-510.png > > > *Problem* > We found that HA storage dir leaks on cluster termination for a Flink job > with HA enabled. The following picture shows the HA storage dir (here on > HDFS) of the cluster czh-flink-test-offline (of application mode) after > canelling the job with flink-cancel. We are left with an empty dir, and too > many empty dirs will greatly hurt the stability of HDFS NameNode! > !image-2022-12-27-21-32-17-510.png|width=582,height=158! > > Furthermore, in case the user choose to retain the checkpoints on job > termination, we will have the completedCheckpoints leaked as well. Note that > we no longer need the completedCheckpoints files as we'll directly recover > retained CPs from the CP data dir. > *Root Cause* > When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob > store, but didn't clean the HA storage dir. > *Proposal* > Clean up the HA storage dir after cleaning up blob store in > AbstractHaServices#closeAndCleanupAllData(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30513) HA storage dir leaks on cluster termination
[ https://issues.apache.org/jira/browse/FLINK-30513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanghao Chen updated FLINK-30513: -- Description: *Problem* We found that HA storage dir leaks on cluster termination for a Flink job with HA enabled. The following picture shows the HA storage dir (here on HDFS) of the cluster czh-flink-test-offline (of application mode) after canelling the job with flink-cancel. We are left with an empty dir, and too many empty dirs will greatly hurt the stability of HDFS NameNode! !image-2022-12-27-21-32-17-510.png|width=582,height=158! Furthermore, in case the user choose to retain the checkpoints on job termination, we will have the completedCheckpoints leaked as well. Note that we no longer need the completedCheckpoints files as we'll directly recover retained CPs from the CP data dir. *Root Cause* When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob store, but didn't clean the HA storage dir. *Proposal* Clean up the HA storage dir after cleaning up blob store in AbstractHaServices#closeAndCleanupAllData(). was: *Problem* We found that HA storage dir leaks on cluster termination for a Flink job with HA enabled. The following picture shows the HA storage dir (here on HDFS) of the cluster czh-flink-test-offline (of application mode) after canelling the job with flink-cancel. We are left with an empty dir, and too many empty dirs will greatly hurt the stability of HDFS NameNode! !image-2022-12-27-21-32-17-510.png|width=582,height=158! Furthermore, in case the user choose to retain the checkpoints on job termination, we will have the completedCheckpoints leaked as well. Note that we no longer need the completedCheckpoints files as we'll directly recover retained CPs from the CP data dir. *Root Cause* When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob store, but didn't clean the HA storage dir. *Proposal* Clean up the HA storage dir after cleaning up blob store in AbstractHaServices#closeAndCleanupAllData(). > HA storage dir leaks on cluster termination > > > Key: FLINK-30513 > URL: https://issues.apache.org/jira/browse/FLINK-30513 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.15.0, 1.16.0 >Reporter: Zhanghao Chen >Priority: Major > Attachments: image-2022-12-27-21-32-17-510.png > > > *Problem* > We found that HA storage dir leaks on cluster termination for a Flink job > with HA enabled. The following picture shows the HA storage dir (here on > HDFS) of the cluster czh-flink-test-offline (of application mode) after > canelling the job with flink-cancel. We are left with an empty dir, and too > many empty dirs will greatly hurt the stability of HDFS NameNode! > !image-2022-12-27-21-32-17-510.png|width=582,height=158! > > Furthermore, in case the user choose to retain the checkpoints on job > termination, we will have the completedCheckpoints leaked as well. Note that > we no longer need the completedCheckpoints files as we'll directly recover > retained CPs from the CP data dir. > *Root Cause* > When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob > store, but didn't clean the HA storage dir. > *Proposal* > Clean up the HA storage dir after cleaning up blob store in > AbstractHaServices#closeAndCleanupAllData(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30513) HA storage dir leaks on cluster termination
Zhanghao Chen created FLINK-30513: - Summary: HA storage dir leaks on cluster termination Key: FLINK-30513 URL: https://issues.apache.org/jira/browse/FLINK-30513 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.16.0, 1.15.0 Reporter: Zhanghao Chen Attachments: image-2022-12-27-21-32-17-510.png *Problem* We found that HA storage dir leaks on cluster termination for a Flink job with HA enabled. The following picture shows the HA storage dir (here on HDFS) of the cluster czh-flink-test-offline (of application mode) after canelling the job with flink-cancel. We are left with an empty dir, and too many empty dirs will greatly hurt the stability of HDFS NameNode! !image-2022-12-27-21-32-17-510.png|width=582,height=158! Furthermore, in case the user choose to retain the checkpoints on job termination, we will have the completedCheckpoints leaked as well. Note that we no longer need the completedCheckpoints files as we'll directly recover retained CPs from the CP data dir. *Root Cause* When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob store, but didn't clean the HA storage dir. *Proposal* Clean up the HA storage dir after cleaning up blob store in AbstractHaServices#closeAndCleanupAllData(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
fsk119 commented on code in PR #21504: URL: https://github.com/apache/flink/pull/21504#discussion_r1057674344 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala: ## @@ -607,6 +607,195 @@ class TableEnvironmentTest { checkData(expectedResult.iterator(), tableResult.collect()) } + @Test + def testAlterTableModifyColumn(): Unit = { +val statement = Review Comment: nit: These negative cases is much same in the `table.q`. I think we can keep one... ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ## @@ -417,6 +422,92 @@ void checkColumnExists(String columnName) { } } +private static class ModifySchemaConverter extends SchemaConverter { + +ModifySchemaConverter( +Schema originalSchema, +FlinkTypeFactory typeFactory, +SqlValidator sqlValidator, +Consumer constraintValidator, +Function escapeExpressions, +SchemaResolver schemaResolver) { +super( +originalSchema, +typeFactory, +sqlValidator, +constraintValidator, +escapeExpressions, +schemaResolver); +} + +@Override +void checkColumnExists(String columnName) { +if (!sortedColumnNames.contains(columnName)) { +throw new ValidationException( +String.format( +"%sTry to modify a column `%s` which does not exist in the table.", +EX_MSG_PREFIX, columnName)); +} +} + +@Override +void checkPrimaryKeyExists() { +if (primaryKey == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any primary key constraint. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +void checkWatermarkExists() { +if (watermarkSpec == null) { +throw new ValidationException( +String.format( +"%sThe base table does not define any watermark. You might " ++ "want to add a new one.", +EX_MSG_PREFIX)); +} +} + +@Override +Optional getColumnPosition(SqlTableColumnPosition columnPosition) { +if (columnPosition.isFirstColumn() || columnPosition.isAfterReferencedColumn()) { + sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple()); +return super.getColumnPosition(columnPosition); +} +return Optional.empty(); +} + +@Override +Schema.UnresolvedPhysicalColumn convertPhysicalColumn( +SqlTableColumn.SqlRegularColumn physicalColumn) { +Schema.UnresolvedPhysicalColumn newColumn = super.convertPhysicalColumn(physicalColumn); +String columnName = newColumn.getName(); +// preserves the primary key's nullability +if (primaryKey != null && primaryKey.getColumnNames().contains(columnName)) { +newColumn = +new Schema.UnresolvedPhysicalColumn( +columnName, +newColumn.getDataType().notNull(), +newColumn.getComment().orElse(null)); +} Review Comment: Why not mark the column is not null when building the final schema with the final pk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window
gyfora commented on code in PR #493: URL: https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057676553 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java: ## @@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory( // Add scaling metrics to history if they were computed successfully scalingMetricHistory.put(clock.instant(), scalingMetrics); -scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory); +scalingInformation.updateMetricHistory(currentJobUpdateTs, scalingMetricHistory); + +if (currentJobUpdateTs +.plus(stabilizationDuration) +.isAfter(clock.instant().minus(metricsWindowDuration))) { Review Comment: I think in this early state it’s better to have flexibility with the configuration and document recommendations than to restrict the behavior. We still have time to simplify this in the future as we gather working knowledge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
luoyuxia commented on PR #21257: URL: https://github.com/apache/flink/pull/21257#issuecomment-1365896752 @beyond1920 Thanks for reviewing. I prefer to append a commit with tests to mock the speculative execution to validate these operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window
gyfora commented on code in PR #493: URL: https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057664132 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java: ## @@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory( // Add scaling metrics to history if they were computed successfully scalingMetricHistory.put(clock.instant(), scalingMetrics); -scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory); +scalingInformation.updateMetricHistory(currentJobUpdateTs, scalingMetricHistory); + +if (currentJobUpdateTs +.plus(stabilizationDuration) +.isAfter(clock.instant().minus(metricsWindowDuration))) { Review Comment: At any point the data rate might suddenly increase (during any scale operation) the minimum window allows us to react to it. This is not only about the first time we start the job -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window
gyfora commented on code in PR #493: URL: https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057663391 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java: ## @@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory( // Add scaling metrics to history if they were computed successfully scalingMetricHistory.put(clock.instant(), scalingMetrics); -scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory); +scalingInformation.updateMetricHistory(currentJobUpdateTs, scalingMetricHistory); + +if (currentJobUpdateTs +.plus(stabilizationDuration) +.isAfter(clock.instant().minus(metricsWindowDuration))) { Review Comment: I think the min/max is actually simpler to understand and configure . If oscillation occurs then your stabilization period may be too small? the default for min/max could be the same duration but I would not make this more complicated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window
mxm commented on code in PR #493: URL: https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057660487 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java: ## @@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory( // Add scaling metrics to history if they were computed successfully scalingMetricHistory.put(clock.instant(), scalingMetrics); -scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory); +scalingInformation.updateMetricHistory(currentJobUpdateTs, scalingMetricHistory); + +if (currentJobUpdateTs +.plus(stabilizationDuration) +.isAfter(clock.instant().minus(metricsWindowDuration))) { Review Comment: After thinking about this change more, it might actually be tricky for users to set the minimum metric window size correctly. There could be unexpected side effects if the delta between the two is too high, e.g. oscillation where the minimum window scales up, the maximum window scales down, and so on. It seems we only want to permit the minimum window for the initial scaling but not for subsequent scaling decisions. We could keep `metrics.window` and introduce `metrics.window.initial` which is only respected as long as the last scaling decision didn't come from the regular window. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] beyond1920 commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
beyond1920 commented on code in PR #21257: URL: https://github.com/apache/flink/pull/21257#discussion_r1057658616 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/CompactFileUtils.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.table.stream.compact.CompactContext; +import org.apache.flink.connector.file.table.stream.compact.CompactReader; +import org.apache.flink.connector.file.table.stream.compact.CompactWriter; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Utils for compacting files. */ +public class CompactFileUtils { + +private static final Logger LOG = LoggerFactory.getLogger(CompactFileUtils.class); + +public static final String UNCOMPACTED_PREFIX = ".uncompacted-"; + +public static final String COMPACTED_PREFIX = "compacted-"; + +/** The function interface for move single file. */ +@FunctionalInterface +public interface SingleFileMvFunction { +R apply(T t, U u) throws IOException; +} + +/** + * Do Compaction: - Target file exists, do nothing. - Can do compaction: - Single file, do + * atomic renaming, there are optimizations for FileSystem. - Multiple file, do reading and + * writing. + */ +public static Path doCompact( +FileSystem fileSystem, +String partition, +List paths, +Configuration config, +SingleFileMvFunction singleFileMvFunc, +CompactReader.Factory readerFactory, +CompactWriter.Factory writerFactory) +throws IOException { +if (paths.size() == 0) { +return null; +} + +Map inputMap = new HashMap<>(); +for (Path path : paths) { +inputMap.put(path, fileSystem.getFileStatus(path).getLen()); +} + +Path target = createCompactedFile(paths); +if (fileSystem.exists(target)) { +return target; Review Comment: I'm not sure if this behavior is correct if enable speculative execution. If one compact operator is slow, the scheduler might deploy a new attempt for it. The compact operator in new attempt would be return directly here because the target path is already exists. But it does not do anything yet. ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/CompactFileUtils.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.table.stream.compact.CompactContext; +import org.apache.flink.connector.file.table.stream.compact.CompactReader; +import org.apache.flink.connector.file.table.stream.compact.CompactWriter; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import
[jira] [Closed] (FLINK-30494) Introduce TableChange to represents SET/RESET change
[ https://issues.apache.org/jira/browse/FLINK-30494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang closed FLINK-30494. - Resolution: Implemented Merged into master: 75a92efd7b35501698e5de253e5231d680830c16 > Introduce TableChange to represents SET/RESET change > > > Key: FLINK-30494 > URL: https://issues.apache.org/jira/browse/FLINK-30494 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.17.0 >Reporter: Shengkai Fang >Assignee: Shengkai Fang >Priority: Major > Fix For: 1.17.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 merged pull request #21554: [FLINK-30493][table-api] Introduce TableChange to SET/RESET options
fsk119 merged PR #21554: URL: https://github.com/apache/flink/pull/21554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] beyond1920 commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
beyond1920 commented on code in PR #21257: URL: https://github.com/apache/flink/pull/21257#discussion_r1057645105 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java: ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.batch.compact; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.table.OutputFormatFactory; +import org.apache.flink.connector.file.table.PartitionComputer; +import org.apache.flink.connector.file.table.PartitionTempFileManager; +import org.apache.flink.connector.file.table.PartitionWriter; +import org.apache.flink.connector.file.table.PartitionWriterFactory; +import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput; +import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableException; + +import java.util.LinkedHashMap; + +/** + * An operator for writing files in batch mode. Once creating a new file to write, the writing + * operator will emit the written file to upstream. + */ +public class BatchFileWriter extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +private final Path tmpPath; +private final String[] partitionColumns; +private final boolean dynamicGrouped; +private final LinkedHashMap staticPartitions; +private final PartitionComputer computer; +private final OutputFormatFactory formatFactory; +private final OutputFileConfig outputFileConfig; + +private transient PartitionWriter writer; + +public BatchFileWriter( +Path tmpPath, +String[] partitionColumns, +boolean dynamicGrouped, +LinkedHashMap staticPartitions, +OutputFormatFactory formatFactory, +PartitionComputer computer, +OutputFileConfig outputFileConfig) { +this.tmpPath = tmpPath; +this.partitionColumns = partitionColumns; +this.dynamicGrouped = dynamicGrouped; +this.staticPartitions = staticPartitions; +this.formatFactory = formatFactory; +this.computer = computer; +this.outputFileConfig = outputFileConfig; +} + +@Override +public void open() throws Exception { +try { +PartitionTempFileManager fileManager = +new PartitionTempFileManager( +tmpPath, +getRuntimeContext().getIndexOfThisSubtask(), +getRuntimeContext().getAttemptNumber(), +outputFileConfig); Review Comment: It's not safe if the job enables speculative execution because multiple attempts would write to the same file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30512) Flink SQL state TTL has no effect when using Interval Join
[ https://issues.apache.org/jira/browse/FLINK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wangkang updated FLINK-30512: - Description: Take the following join SQL program as an example: {code:java} SET 'table.exec.state.ttl' = '90 ms'; select ... from kafka_source_dwdexpose as t1 left join kafka_source_expose_attr_click t3 ON t1.mid = t3.mid and t1.sr = t3.sr and t1.time_local = t3.time_local and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE AND t3.log_ltz + INTERVAL '2' MINUTE {code} !flink1.16.png|width=906,height=278! the state size is getting bigger and bigger. we also test the same sql with flink sql 1.13,the state size is stable. was: Take the following join SQL program as an example: {code:java} SET 'table.exec.state.ttl' = '90 ms'; select ... from kafka_source_dwdexpose as t1 left join kafka_source_expose_attr_click t3 ON t1.mid = t3.mid and t1.sr = t3.sr and t1.time_local = t3.time_local and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE AND t3.log_ltz + INTERVAL '2' MINUTE {code} !flink1.16.png! the state size is getting bigger and bigger. we also test the same sql with flink sql 1.13,the state size is stable. > Flink SQL state TTL has no effect when using Interval Join > -- > > Key: FLINK-30512 > URL: https://issues.apache.org/jira/browse/FLINK-30512 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.1, 1.16.1 >Reporter: wangkang >Priority: Major > Attachments: flink1.16.png > > > Take the following join SQL program as an example: > {code:java} > SET 'table.exec.state.ttl' = '90 ms'; > select > ... > from kafka_source_dwdexpose as t1 > left join kafka_source_expose_attr_click t3 > ON t1.mid = t3.mid and t1.sr = t3.sr > and t1.time_local = t3.time_local > and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE AND t3.log_ltz + > INTERVAL '2' MINUTE {code} > !flink1.16.png|width=906,height=278! > the state size is getting bigger and bigger. > we also test the same sql with flink sql 1.13,the state size is stable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30512) Flink SQL state TTL has no effect when using Interval Join
wangkang created FLINK-30512: Summary: Flink SQL state TTL has no effect when using Interval Join Key: FLINK-30512 URL: https://issues.apache.org/jira/browse/FLINK-30512 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.1, 1.16.1 Reporter: wangkang Attachments: flink1.16.png Take the following join SQL program as an example: {code:java} SET 'table.exec.state.ttl' = '90 ms'; select ... from kafka_source_dwdexpose as t1 left join kafka_source_expose_attr_click t3 ON t1.mid = t3.mid and t1.sr = t3.sr and t1.time_local = t3.time_local and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE AND t3.log_ltz + INTERVAL '2' MINUTE {code} !flink1.16.png! the state size is getting bigger and bigger. we also test the same sql with flink sql 1.13,the state size is stable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25421) Port JDBC Sink to new Unified Sink API (FLIP-143)
[ https://issues.apache.org/jira/browse/FLINK-25421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652232#comment-17652232 ] RocMarshal commented on FLINK-25421: Get it. Thanks a lot~ [~eskabetxe] > Port JDBC Sink to new Unified Sink API (FLIP-143) > - > > Key: FLINK-25421 > URL: https://issues.apache.org/jira/browse/FLINK-25421 > Project: Flink > Issue Type: Improvement >Reporter: Martijn Visser >Priority: Major > > The current JDBC connector is using the old SinkFunction interface, which is > going to be deprecated. We should port/refactor the JDBC Sink to use the new > Unified Sink API, based on FLIP-143 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.
[ https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RocMarshal updated FLINK-30511: --- Description: * Code segment: {code:java} public class OnTimerDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("taskmanager.numberOfTaskSlots", "4"); conf.setString("state.checkpoint-storage", "filesystem"); conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); conf.setString("execution.checkpointing.interval", "30s"); //conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A: StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); EnvironmentSettings envSetting = EnvironmentSettings .newInstance() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); String sourceDDL = "CREATE TABLE orders (\n" + " id INT,\n" + " app INT,\n" + " user_id STRING" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.app.min'='1',\n" + " 'fields.app.max'='10',\n" + " 'fields.user_id.length'='10'\n" + ")"; tableEnv.executeSql(sourceDDL); Table query = tableEnv.sqlQuery("select * from orders"); DataStream rowDataStream = tableEnv.toAppendStream(query, Row.class); TypeInformation[] returnTypes = new TypeInformation[4]; returnTypes[0] = Types.INT; returnTypes[1] = Types.INT; // Anchor-B: returnTypes[2] = Types.INT; returnTypes[3] = Types.INT; rowDataStream.keyBy(new KeySelector() { @Override public String getKey(Row value) throws Exception { return value.getFieldAs(2); } }).process(new KeyedProcessFunction() { private Row firstRow; @Override public void processElement(Row value, Context ctx, Collector out) throws Exception { if (firstRow == null) { firstRow = value; } ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { Row colRow = new Row(4); colRow.setField(0, 0); colRow.setField(1, 1); colRow.setField(2, 2); colRow.setField(3, 3); out.collect(colRow); // Anchor-C } }).name("TargetTestUDF") .returns(new RowTypeInfo(returnTypes)) .print(); env.execute(OnTimerDemo.class.getSimpleName()); } } {code} * Recurrence steps ** Run the job without state. ** Collect the latest available checkpoint path as 'checkpoint-path-a' ** Stop the job. ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line. ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line. ** Then add break-point at 'StreamTask#handleAsyncException' method. ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at 'StreamTask#handleAsyncException' method. ** So, The framework can't catch the same exception in the case. * Root cause: ** !截屏2022-12-27 18.51.12.png! ** When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'. !截屏2022-12-27 19.20.00.png! Could it be seen as the framework's missing handling of exceptions? If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', which is suitable for the intention of the 'Task#restoreAndInvoke'. Thank u. was: * Code segment: {code:java} public class OnTimerDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("taskmanager.numberOfTaskSlots", "4");
[jira] [Created] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.
RocMarshal created FLINK-30511: -- Summary: Ignore the Exception in user-timer Triggerble when recover form state. Key: FLINK-30511 URL: https://issues.apache.org/jira/browse/FLINK-30511 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.16.0 Environment: Flink 1.16.0 java8 deployment Mode: miniCluster in IDC; standalone, yarn-application. Reporter: RocMarshal Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png * Code segment: {code:java} public class OnTimerDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("taskmanager.numberOfTaskSlots", "4"); conf.setString("state.checkpoint-storage", "filesystem"); conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); conf.setString("execution.checkpointing.interval", "30s"); //conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A: StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); EnvironmentSettings envSetting = EnvironmentSettings .newInstance() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); String sourceDDL = "CREATE TABLE orders (\n" + " id INT,\n" + " app INT,\n" + " user_id STRING" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.app.min'='1',\n" + " 'fields.app.max'='10',\n" + " 'fields.user_id.length'='10'\n" + ")"; tableEnv.executeSql(sourceDDL); Table query = tableEnv.sqlQuery("select * from orders"); DataStream rowDataStream = tableEnv.toAppendStream(query, Row.class); TypeInformation[] returnTypes = new TypeInformation[4]; returnTypes[0] = Types.INT; returnTypes[1] = Types.INT; // Anchor-B: returnTypes[2] = Types.INT; returnTypes[3] = Types.INT; rowDataStream.keyBy(new KeySelector() { @Override public String getKey(Row value) throws Exception { return value.getFieldAs(2); } }).process(new KeyedProcessFunction() { private Row firstRow; @Override public void processElement(Row value, Context ctx, Collector out) throws Exception { if (firstRow == null) { firstRow = value; } ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { Row colRow = new Row(4); colRow.setField(0, 0); colRow.setField(1, 1); colRow.setField(2, 2); colRow.setField(3, 3); out.collect(colRow); // Anchor-C } }).name("TargetTestUDF") .returns(new RowTypeInfo(returnTypes)) .print(); env.execute(OnTimerDemo.class.getSimpleName()); } } {code} * Recurrence steps ** Run the job without state. ** Collect the latest available checkpoint path as 'checkpoint-path-a' ** Stop the job. ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line. ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line. ** Then add break-point at 'StreamTask#handleAsyncException' method. ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at 'StreamTask#handleAsyncException' method. ** So, The framework can't catch the same exception in the case. * Root cause: ** !截屏2022-12-27 18.51.12.png! ** When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'. !截屏2022-12-27 19.20.00.png! Could it be set as the framework's missing handling of exceptions? If so, I prefer to re-throw the
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window
gyfora commented on code in PR #493: URL: https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057607945 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java: ## @@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory( // Add scaling metrics to history if they were computed successfully scalingMetricHistory.put(clock.instant(), scalingMetrics); -scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory); +scalingInformation.updateMetricHistory(currentJobUpdateTs, scalingMetricHistory); + +if (currentJobUpdateTs +.plus(stabilizationDuration) +.isAfter(clock.instant().minus(metricsWindowDuration))) { Review Comment: Looks good! Please also update the autoscaler doc page section about the improved configuration (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#stabilization-and-metrics-collection-intervals) and the example configs in docs and yamls. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of columnar format
flinkbot commented on PR #21563: URL: https://github.com/apache/flink/pull/21563#issuecomment-1365806084 ## CI report: * 0f3cbc42fcab5f2f351290cae304ed6ea2cca78d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19889) Supports nested projection pushdown for filesystem connector of columnar format
[ https://issues.apache.org/jira/browse/FLINK-19889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19889: --- Labels: pull-request-available (was: ) > Supports nested projection pushdown for filesystem connector of columnar > format > --- > > Key: FLINK-19889 > URL: https://issues.apache.org/jira/browse/FLINK-19889 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem, Table SQL / Ecosystem >Reporter: Jark Wu >Priority: Major > Labels: pull-request-available > > Once FLINK-19639 is supported, we can support nested projection pushdown for > filesystem connector, but only for columnar format first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] LadyForest commented on a diff in pull request #19329: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement
LadyForest commented on code in PR #19329: URL: https://github.com/apache/flink/pull/19329#discussion_r1057594885 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java: ## @@ -144,6 +147,163 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable originTable, +ResolvedSchema originResolveSchema) { +Schema originSchema = originTable.getUnresolvedSchema(); Review Comment: Can we reuse `SchemaResolver` to simplify this check just like ALTER TABLE ADD? ```java Schema originSchema = originTable.getUnresolvedSchema(); List newColumns = originSchema.getColumns().stream() .map( column -> { if (column.getName().equals(originColumnName)) { return convertColumn(column, newColumnName); } else { return column; } }) .collect(Collectors.toList()); Schema updatedSchema = Schema.newBuilder().fromSchema(originSchema).replaceColumns(newColumns).build(); // try { //schemaResolver.resolve(updatedSchema); //return updatedSchema; //} catch (Exception e) { //throw new ValidationException( //String.format("%s%s", EX_MSG_PREFIX, e.getMessage()), e); //} return new AlterTableSchemaOperation( tableIdentifier, CatalogTable.of( updatedSchema, originTable.getComment(), originTable.getPartitionKeys(), originTable.getOptions())); ``` ## flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl: ## @@ -608,6 +612,18 @@ SqlAlterTable SqlAlterTable() : tableIdentifier, newTableIdentifier); } +| + +originColumnIdentifier = SimpleIdentifier() Review Comment: Do we plan to support the nested column at the syntax level, just like `ALTER TABLE ADD/MODIFY` did? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java: ## @@ -1258,6 +1259,119 @@ public void testAlterTable() throws Exception { .hasMessageContaining("ALTER TABLE RESET does not support empty key"); } +@Test +public void testAlterTableRenameColumn() throws Exception { Review Comment: Can we reuse `prepareTable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchuanchen opened a new pull request, #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of columnar forma
yuchuanchen opened a new pull request, #21563: URL: https://github.com/apache/flink/pull/21563 …sted projection pushdown for filesystem connector of columnar format ## What is the purpose of the change Supports nested projection pushdown for filesystem connector of columnar format ## Brief change log ## Verifying this change This change added tests and can be verified as follows: HiveTableSourceITCase.testReadParquetNestedPushdown() ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no ) - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window
mxm commented on code in PR #493: URL: https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057588877 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java: ## @@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory( // Add scaling metrics to history if they were computed successfully scalingMetricHistory.put(clock.instant(), scalingMetrics); -scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory); +scalingInformation.updateMetricHistory(currentJobUpdateTs, scalingMetricHistory); + +if (currentJobUpdateTs +.plus(stabilizationDuration) +.isAfter(clock.instant().minus(metricsWindowDuration))) { Review Comment: Created [FLINK-30510](https://issues.apache.org/jira/browse/FLINK-30510) and tagged the commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29950) Refactor ResultSet to an interface
[ https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuzelin updated FLINK-29950: Description: Currently, the 'ResultSet' only contains Schema and data for execution result, witch is not enough for Client to display necessary information. So we need to add more fields to improve 'ResultSet'. We can refactor the ResultSet to an interface and hide the detail of implementation of ResultSet, such as how to Ser/De the data in ResultSet. was: Currently, the 'ResultSet' only contains Schema and data for execution result, witch is not enough for Client to display necessary information. So we need to add more fields to improve 'ResultSet'. We can refactor the ResultSet to an interface and hide the detail of implementation of ResultSet. > Refactor ResultSet to an interface > -- > > Key: FLINK-29950 > URL: https://issues.apache.org/jira/browse/FLINK-29950 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: yuzelin >Assignee: yuzelin >Priority: Major > > Currently, the 'ResultSet' only contains Schema and data for execution > result, witch is not enough for Client to display necessary information. So > we need to add more fields to improve 'ResultSet'. > We can refactor the ResultSet to an interface and hide the detail of > implementation of ResultSet, such as how to Ser/De the data in ResultSet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30510) Allow configuring minimum and maximum metrics window size
Maximilian Michels created FLINK-30510: -- Summary: Allow configuring minimum and maximum metrics window size Key: FLINK-30510 URL: https://issues.apache.org/jira/browse/FLINK-30510 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.4.0 It would be more flexible to have a minimum and maximum window size for metric collection instead of a fixed window size. This would allow for faster metrics evaluation after a rescale operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29950) Refactor ResultSet to an interface
[ https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuzelin updated FLINK-29950: Description: Currently, the 'ResultSet' only contains Schema and data for execution result, witch is not enough for Client to display necessary information. So we need to add more fields to improve 'ResultSet'. We can refactor the ResultSet to an interface and hide the detail of implementation of ResultSet. was:Currently, the 'ResultSet' only contains Schema and data for execution result, witch is not enough for Client to display necessary information. So we need to add more fields to improve 'ResultSet'. > Refactor ResultSet to an interface > -- > > Key: FLINK-29950 > URL: https://issues.apache.org/jira/browse/FLINK-29950 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: yuzelin >Assignee: yuzelin >Priority: Major > > Currently, the 'ResultSet' only contains Schema and data for execution > result, witch is not enough for Client to display necessary information. So > we need to add more fields to improve 'ResultSet'. > We can refactor the ResultSet to an interface and hide the detail of > implementation of ResultSet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29950) Refactor ResultSet to a interface
[ https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuzelin updated FLINK-29950: Summary: Refactor ResultSet to a interface (was: ResultSet Refactor) > Refactor ResultSet to a interface > - > > Key: FLINK-29950 > URL: https://issues.apache.org/jira/browse/FLINK-29950 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: yuzelin >Assignee: yuzelin >Priority: Major > > Currently, the 'ResultSet' only contains Schema and data for execution > result, witch is not enough for Client to display necessary information. So > we need to add more fields to improve 'ResultSet'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29950) Refactor ResultSet to an interface
[ https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuzelin updated FLINK-29950: Summary: Refactor ResultSet to an interface (was: Refactor ResultSet to a interface) > Refactor ResultSet to an interface > -- > > Key: FLINK-29950 > URL: https://issues.apache.org/jira/browse/FLINK-29950 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: yuzelin >Assignee: yuzelin >Priority: Major > > Currently, the 'ResultSet' only contains Schema and data for execution > result, witch is not enough for Client to display necessary information. So > we need to add more fields to improve 'ResultSet'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser commented on pull request #21542: [FLINK-27640][Connector/Hive] Exclude Pentaho dependency from Hive
MartijnVisser commented on PR #21542: URL: https://github.com/apache/flink/pull/21542#issuecomment-1365743830 > I'm back. I have verified with Hive 2.3.9 & Hive 3.1.3, it still works after we exclude pentaho dependency. Thanks for checking! > `mvn dependency:tree` tells that there is another dependency on `org.pentaho:pentaho-aggdesigner-algorithm` in `flink-sql-client` > shouldn't it also be excluded there? Yes we should. I'll update the PR to include that, thanks for double checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21562: [FLINK-30509] Modify sql-client.sh
flinkbot commented on PR #21562: URL: https://github.com/apache/flink/pull/21562#issuecomment-1365742842 ## CI report: * a0fbb989f0315cf2c0e757f0e53f17b390359a2e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30509) Modify sql-client.sh
[ https://issues.apache.org/jira/browse/FLINK-30509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30509: --- Labels: pull-request-available (was: ) > Modify sql-client.sh > > > Key: FLINK-30509 > URL: https://issues.apache.org/jira/browse/FLINK-30509 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client >Affects Versions: 1.17.0 >Reporter: yuzelin >Priority: Major > Labels: pull-request-available > > New design of SQL client will depend on the sql-gateway module. So add the > jar to the jar path in starting script. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yuzelin opened a new pull request, #21562: [FLINK-30509] Modify sql-client.sh
yuzelin opened a new pull request, #21562: URL: https://github.com/apache/flink/pull/21562 ## What is the purpose of the change New design of SQL client will depend on the sql-gateway module. So add the jar to the jar path in starting script. ## Brief change log *(for example:)* - Modify `sql-client.sh`. - Modify pom. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30509) Modify sql-client.sh
yuzelin created FLINK-30509: --- Summary: Modify sql-client.sh Key: FLINK-30509 URL: https://issues.apache.org/jira/browse/FLINK-30509 Project: Flink Issue Type: Sub-task Components: Table SQL / Client Affects Versions: 1.17.0 Reporter: yuzelin New design of SQL client will depend on the sql-gateway module. So add the jar to the jar path in starting script. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30368) Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected
[ https://issues.apache.org/jira/browse/FLINK-30368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Godfrey He closed FLINK-30368. -- Resolution: Fixed Fixed in master: dc862dae28a172f674a9b8a2198c603275304550 > Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the > method input domainSize much larger than numSelected > --- > > Key: FLINK-30368 > URL: https://issues.apache.org/jira/browse/FLINK-30368 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Assignee: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the > method input domainSize much larger than numSelected. This wrong zero value > will affect the selection of join type。 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30368) Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected
[ https://issues.apache.org/jira/browse/FLINK-30368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Godfrey He reassigned FLINK-30368: -- Assignee: Yunhong Zheng > Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the > method input domainSize much larger than numSelected > --- > > Key: FLINK-30368 > URL: https://issues.apache.org/jira/browse/FLINK-30368 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Assignee: Yunhong Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the > method input domainSize much larger than numSelected. This wrong zero value > will affect the selection of join type。 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] godfreyhe closed pull request #21490: [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than n
godfreyhe closed pull request #21490: [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected URL: https://github.com/apache/flink/pull/21490 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30505) Close the connection between TM and JM when task executor failed
[ https://issues.apache.org/jira/browse/FLINK-30505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652188#comment-17652188 ] Yongming Zhang commented on FLINK-30505: [~xtsong] Thanks for participating in this discussion > Close the connection between TM and JM when task executor failed > > > Key: FLINK-30505 > URL: https://issues.apache.org/jira/browse/FLINK-30505 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.16.0 >Reporter: Yongming Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > When resource manager detects a task executor has failed, it will close > connection with task executor. At this time,jobs running on this tm will fail > for other reasons(no longger reachable or heartbeat timeout). > !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672047809511-a4b8b5d9-f11f-483c-a113-b42290a33250.png|width=1160,id=uc24b1166! > If close the connection between task executor and job master when resource > manager detects a task executor has failed,the real reason for task executor > failure will appear in "Root Exception".This will make it easier for users to > find problems. > !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672048733572-2b5b7be4-087d-46ae-9c8d-6ad5a1344019.png|width=1141,id=u947d8c4e! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yuzelin commented on a diff in pull request #21525: [FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway
yuzelin commented on code in PR #21525: URL: https://github.com/apache/flink/pull/21525#discussion_r1057502548 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java: ## @@ -58,154 +58,139 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig; import static org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig; +import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestClientAndEndpointUtils.TestRestClient.getTestRestClient; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT cases for {@link SqlGatewayRestEndpoint}. */ class SqlGatewayRestEndpointITCase { -private static final SqlGatewayService service = null; - -private static RestServerEndpoint serverEndpoint; -private static RestClient restClient; +private static SqlGatewayRestEndpoint serverEndpoint; +private static TestRestClient restClient; private static InetSocketAddress serverAddress; -private static TestBadCaseHandler testHandler; -private static TestVersionSelectionHeaders1 header1; -private static TestVersionSelectionHeaders2 header2; private static TestBadCaseHeaders badCaseHeader; -private static TestVersionHandler testVersionHandler1; -private static TestVersionHandler testVersionHandler2; +private static TestBadCaseHandler testHandler; + +private static TestVersionSelectionHeaders0 header0; +private static TestVersionSelectionHeaders12 header12; + +private static TestVersionHandler testVersionHandler0; +private static TestVersionHandler testVersionHandler12; private static Configuration config; private static final Time timeout = Time.seconds(10L); @BeforeEach void setup() throws Exception { // Test version cases -header1 = new TestVersionSelectionHeaders1(); -header2 = new TestVersionSelectionHeaders2(); -testVersionHandler1 = new TestVersionHandler(service, header1); -testVersionHandler2 = new TestVersionHandler(service, header2); +header0 = new TestVersionSelectionHeaders0(); +header12 = new TestVersionSelectionHeaders12(); +testVersionHandler0 = new TestVersionHandler(header0); +testVersionHandler12 = new TestVersionHandler(header12); // Test exception cases badCaseHeader = new TestBadCaseHeaders(); -testHandler = new TestBadCaseHandler(service); +testHandler = new TestBadCaseHandler(); // Init final String address = InetAddress.getLoopbackAddress().getHostAddress(); config = getBaseConfig(getFlinkConfig(address, address, "0")); serverEndpoint = -TestingSqlGatewayRestEndpoint.builder(config, service) +TestSqlGatewayRestEndpoint.builder(config) .withHandler(badCaseHeader, testHandler) -.withHandler(header1, testVersionHandler1) -.withHandler(header2, testVersionHandler2) +.withHandler(header0, testVersionHandler0) +.withHandler(header12, testVersionHandler12) .buildAndStart(); -restClient = -new RestClient( -config, -Executors.newFixedThreadPool( -1, new ExecutorThreadFactory("rest-client-thread-pool"))); +restClient = getTestRestClient(); serverAddress = serverEndpoint.getServerAddress(); } @AfterEach void stop() throws Exception { - if (restClient != null) { -restClient.shutdown(timeout); +restClient.shutdown(); restClient = null; } if (serverEndpoint != null) { -serverEndpoint.closeAsync().get(timeout.getSize(), timeout.getUnit()); +serverEndpoint.stop(); serverEndpoint = null; } } /** Test that {@link SqlGatewayMessageHeaders} can identify the version correctly. */ @Test void testSqlGatewayMessageHeaders() throws Exception { -// The header only support V1, but send request by V0 +// The header can't support V0, but sends request by V0 assertThatThrownBy( () -> restClient.sendRequest(