[GitHub] [flink] beyond1920 commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


beyond1920 commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058118429


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression tryCastOperand = tryCast(operand(0), 
typeLiteral(getResultType()));
+return new Expression[] {
+/* sum = */ ifThenElse(
+isNull(operand(0)),
+sum,
+ifThenElse(
+isNull(tryCastOperand),
+sum,
+ifThenElse(
+isNull(sum),
+tryCastOperand,
+adjustedPlus(sum, tryCastOperand
+};
+}

Review Comment:
   Do we need cast the final result here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] beyond1920 commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


beyond1920 commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058116587


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression tryCastOperand = tryCast(operand(0), 
typeLiteral(getResultType()));
+return new Expression[] {
+/* sum = */ ifThenElse(
+isNull(operand(0)),
+sum,
+ifThenElse(
+isNull(tryCastOperand),
+sum,

Review Comment:
   Do we really need  use `ifThenElse(isNull(tryCastOperand), sum,`  again here 
since we have checked `isNull(operand(0))` before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-27 Thread GitBox


jiangxin369 commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1057228565


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/MinHashLSH.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.io.IOException;
+
+/**
+ * An Estimator that implements the MinHash LSH algorithm, which supports LSH 
for Jaccard distance.
+ *
+ * The input could be dense or sparse vectors. Each input vector must hava 
at least one non-zero

Review Comment:
   ```suggestion
* The input could be dense or sparse vectors. Each input vector must 
have at least one non-zero
   ```



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java:
##
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+/**
+ * Base class for LSH model.
+ *
+ * In addition to transforming input feature vectors to multiple hash 
values, it also supports
+ * approximate nearest neighbors search within a dataset regarding a key 
vector and approximate
+ * similarity join between two datasets.
+ *
+ * @param  class type of the LSHModel implementation itself.
+ */
+abstract class LSHModel> implements Model, 
LSHModelParams {
+private static final String MODEL_DATA_BC_KEY = "modelData";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+/** Stores the corresponding model data class of T. */
+private final Class modelDataClass;
+
+protected Table modelDataTable;
+
+public LSHModel(Class modelDataClass) {
+

[jira] [Assigned] (FLINK-29859) TPC-DS end-to-end test with adaptive batch scheduler failed due to oo non-empty .out files.

2022-12-27 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-29859:
--

Assignee: Lijie Wang

> TPC-DS end-to-end test with adaptive batch scheduler failed due to oo 
> non-empty .out files.
> ---
>
> Key: FLINK-29859
> URL: https://issues.apache.org/jira/browse/FLINK-29859
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Leonard Xu
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.16.1
>
>
> Nov 03 02:02:12 [FAIL] 'TPC-DS end-to-end test with adaptive batch scheduler' 
> failed after 21 minutes and 44 seconds! Test exited with exit code 0 but the 
> logs contained errors, exceptions or non-empty .out files 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42766=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29859) TPC-DS end-to-end test with adaptive batch scheduler failed due to oo non-empty .out files.

2022-12-27 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang resolved FLINK-29859.

Fix Version/s: 1.17.0
   1.16.1
   Resolution: Fixed

> TPC-DS end-to-end test with adaptive batch scheduler failed due to oo 
> non-empty .out files.
> ---
>
> Key: FLINK-29859
> URL: https://issues.apache.org/jira/browse/FLINK-29859
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Leonard Xu
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0, 1.16.1
>
>
> Nov 03 02:02:12 [FAIL] 'TPC-DS end-to-end test with adaptive batch scheduler' 
> failed after 21 minutes and 44 seconds! Test exited with exit code 0 but the 
> logs contained errors, exceptions or non-empty .out files 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42766=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29859) TPC-DS end-to-end test with adaptive batch scheduler failed due to oo non-empty .out files.

2022-12-27 Thread Lijie Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652398#comment-17652398
 ] 

Lijie Wang commented on FLINK-29859:


Fixed via

master 14a61f368332320d7e38cc93a04f95bb63c66788

release-1.16 9e51cb8c117fd9a5f887e0a0e8faee4ff11462ea

> TPC-DS end-to-end test with adaptive batch scheduler failed due to oo 
> non-empty .out files.
> ---
>
> Key: FLINK-29859
> URL: https://issues.apache.org/jira/browse/FLINK-29859
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Leonard Xu
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> Nov 03 02:02:12 [FAIL] 'TPC-DS end-to-end test with adaptive batch scheduler' 
> failed after 21 minutes and 44 seconds! Test exited with exit code 0 but the 
> logs contained errors, exceptions or non-empty .out files 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42766=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=af184cdd-c6d8-5084-0b69-7e9c67b35f7a



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30516) Add file count and row count in snapshots table

2022-12-27 Thread Shammon (Jira)
Shammon created FLINK-30516:
---

 Summary: Add file count and row count in snapshots table
 Key: FLINK-30516
 URL: https://issues.apache.org/jira/browse/FLINK-30516
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.3.0, table-store-0.4.0
Reporter: Shammon


Add file count and row count in mytable$snapshots table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong commented on a diff in pull request #21560: [FLINK-29768] Hybrid shuffle supports consume partial finished subtask

2022-12-27 Thread GitBox


xintongsong commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1058108777


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java:
##
@@ -122,4 +122,6 @@ List 
getClusterPartitionShuffleDescriptors(
 IntermediateDataSetID intermediateResultPartition);
 
 MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy();
+
+boolean isHybridEnableConsumePartialFinishedProducer();

Review Comment:
   I'm not entirely sure about introducing a hybrid specific interface in 
execution graph. Haven't think of an alternative way though.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * {@link PartialFinishedInputConsumableDecider} is a special {@link 
InputConsumableDecider}. The
+ * input is considered to be consumable:
+ *
+ * 
+ *   for hybrid input: when partial producer partitions are finished.
+ *   for blocking input: when all producer partitions are finished.
+ * 
+ */
+public class PartialFinishedInputConsumableDecider implements 
InputConsumableDecider {
+public static final int NUM_FINISHED_PARTITIONS_AS_CONSUMABLE = 1;
+
+@Override
+public boolean isInputConsumable(
+SchedulingExecutionVertex executionVertex,
+Set verticesToDeploy,
+Map consumableStatusCache) {
+for (ConsumedPartitionGroup consumedPartitionGroup :
+executionVertex.getConsumedPartitionGroups()) {
+
+if (!consumableStatusCache.computeIfAbsent(
+consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+return false;
+}

Review Comment:
   When there're multiple hybrid partition groups, shall we require all groups 
to have at least one finished partition? Or one finished partition from any of 
the groups?



##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -670,6 +670,24 @@ public enum SchedulerType {
 code(SPECULATIVE_ENABLED.key()))
 .build());
 
+@Documentation.Section({
+Documentation.Sections.EXPERT_SCHEDULING,
+Documentation.Sections.ALL_JOB_MANAGER
+})
+public static final ConfigOption 
CONSUME_PARTIAL_FINISHED_PRODUCER_ENABLED =

Review Comment:
   The two configs (`ONLY_CONSUME_FINISHED_PARTITION` and 
`CONSUME_PARTIAL_FINISHED_PRODUCER_ENABLED`) are quite alike, which may confuse 
users.
   - It's hard to understand the differences between them.
   - There could be conflicts. E.g., allow consuming unfinished partitions, 
while not allow consuming partition finished producer.
   
   I wonder if we can combine them into one config, which takes a enum type of 
supported values.



##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import 

[GitHub] [flink] wanglijie95 closed pull request #21559: [FLINK-29859][e2e] Running TPC-DS with adaptive batch scheduler supports custom exceptions check

2022-12-27 Thread GitBox


wanglijie95 closed pull request #21559: [FLINK-29859][e2e] Running TPC-DS with 
adaptive batch scheduler supports custom exceptions check
URL: https://github.com/apache/flink/pull/21559


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #489: [FLINK-30406] Detect when jobmanager never started

2022-12-27 Thread GitBox


morhidi commented on PR #489:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/489#issuecomment-1366390439

   +1 nice job @gyfora !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on pull request #193: [FLINK-30515] Add benchmark for CountVectorizer, Imputer, RobustScale…

2022-12-27 Thread GitBox


jiangxin369 commented on PR #193:
URL: https://github.com/apache/flink-ml/pull/193#issuecomment-1366389187

   @lindong28 Could you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Mulavar commented on a diff in pull request #21545: [FLINK-30396][table]make alias hint take effect in correlate

2022-12-27 Thread GitBox


Mulavar commented on code in PR #21545:
URL: https://github.com/apache/flink/pull/21545#discussion_r1058069998


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##
@@ -189,18 +191,33 @@ public static RelNode capitalizeJoinHints(RelNode root) {
 
 private static class CapitalizeJoinHintShuttle extends RelShuttleImpl {
 
+@Override
+public RelNode visit(LogicalCorrelate correlate) {
+return visitBiRel(correlate);
+}
+
 @Override
 public RelNode visit(LogicalJoin join) {
-List hints = join.getHints();
+return visitBiRel(join);
+}
+
+private RelNode visitBiRel(BiRel biRel) {
+Hintable hBiRel = (Hintable) biRel;
 AtomicBoolean changed = new AtomicBoolean(false);
 List hintsWithCapitalJoinHints =
-hints.stream()
+hBiRel.getHints().stream()
 .map(
 hint -> {
 String capitalHintName =
 
hint.hintName.toUpperCase(Locale.ROOT);
 if 
(JoinStrategy.isJoinStrategy(capitalHintName)) {
 changed.set(true);
+if 
(JoinStrategy.isLookupHint(hint.hintName)) {

Review Comment:
   please refer to `ClearJoinHintWithCapitalizeJoinHintShuttleTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

2022-12-27 Thread GitBox


morhidi commented on code in PR #489:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1058069736


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##
@@ -112,18 +116,23 @@ protected Optional getAvailableUpgradeMode(
 && !flinkVersionChanged(
 ReconciliationUtils.getDeployedSpec(deployment), 
deployment.getSpec())) {
 
-if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-if 
(deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
-// initial deployment failure, reset to allow for spec 
change to proceed
-return resetOnMissingStableSpec(deployment, deployConfig);
-}
-} else {
+if (flinkService.isHaMetadataAvailable(deployConfig)) {
 LOG.info(
 "Job is not running but HA metadata is available for 
last state restore, ready for upgrade");
 return Optional.of(UpgradeMode.LAST_STATE);
 }
 }
 
+if (status.getReconciliationStatus()
+.deserializeLastReconciledSpec()
+.getJob()
+.getUpgradeMode()
+!= UpgradeMode.LAST_STATE
+&& FlinkUtils.jmPodNeverStarted(ctx)) {
+deleteJmThatNeverStarted(deployment, deployConfig);
+return getAvailableUpgradeMode(deployment, ctx, deployConfig, 
observeConfig);

Review Comment:
   Can the process stuck here in a recursive loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


LadyForest commented on code in PR #21504:
URL: https://github.com/apache/flink/pull/21504#discussion_r1058054716


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -417,6 +422,92 @@ void checkColumnExists(String columnName) {
 }
 }
 
+private static class ModifySchemaConverter extends SchemaConverter {
+
+ModifySchemaConverter(
+Schema originalSchema,
+FlinkTypeFactory typeFactory,
+SqlValidator sqlValidator,
+Consumer constraintValidator,
+Function escapeExpressions,
+SchemaResolver schemaResolver) {
+super(
+originalSchema,
+typeFactory,
+sqlValidator,
+constraintValidator,
+escapeExpressions,
+schemaResolver);
+}
+
+@Override
+void checkColumnExists(String columnName) {
+if (!sortedColumnNames.contains(columnName)) {
+throw new ValidationException(
+String.format(
+"%sTry to modify a column `%s` which does not 
exist in the table.",
+EX_MSG_PREFIX, columnName));
+}
+}
+
+@Override
+void checkPrimaryKeyExists() {
+if (primaryKey == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any primary 
key constraint. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+void checkWatermarkExists() {
+if (watermarkSpec == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any 
watermark. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+Optional getColumnPosition(SqlTableColumnPosition 
columnPosition) {
+if (columnPosition.isFirstColumn() || 
columnPosition.isAfterReferencedColumn()) {
+
sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple());
+return super.getColumnPosition(columnPosition);
+}
+return Optional.empty();
+}
+
+@Override
+Schema.UnresolvedPhysicalColumn convertPhysicalColumn(
+SqlTableColumn.SqlRegularColumn physicalColumn) {
+Schema.UnresolvedPhysicalColumn newColumn = 
super.convertPhysicalColumn(physicalColumn);
+String columnName = newColumn.getName();
+// preserves the primary key's nullability
+if (primaryKey != null && 
primaryKey.getColumnNames().contains(columnName)) {
+newColumn =
+new Schema.UnresolvedPhysicalColumn(
+columnName,
+newColumn.getDataType().notNull(),
+newColumn.getComment().orElse(null));
+}

Review Comment:
   > in the SqlAlterTableSchema#validate we have already mark used column not 
null.
   
   Actually, `SqlAlterTableSchema#validate` is not suitable under this 
condition. Consider the previous case I mentioned, 
   `SqlAlterTableSchema#validate` will do nothing because this change does not 
reveal any primary key info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


fsk119 commented on code in PR #21504:
URL: https://github.com/apache/flink/pull/21504#discussion_r1058050529


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -417,6 +422,92 @@ void checkColumnExists(String columnName) {
 }
 }
 
+private static class ModifySchemaConverter extends SchemaConverter {
+
+ModifySchemaConverter(
+Schema originalSchema,
+FlinkTypeFactory typeFactory,
+SqlValidator sqlValidator,
+Consumer constraintValidator,
+Function escapeExpressions,
+SchemaResolver schemaResolver) {
+super(
+originalSchema,
+typeFactory,
+sqlValidator,
+constraintValidator,
+escapeExpressions,
+schemaResolver);
+}
+
+@Override
+void checkColumnExists(String columnName) {
+if (!sortedColumnNames.contains(columnName)) {
+throw new ValidationException(
+String.format(
+"%sTry to modify a column `%s` which does not 
exist in the table.",
+EX_MSG_PREFIX, columnName));
+}
+}
+
+@Override
+void checkPrimaryKeyExists() {
+if (primaryKey == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any primary 
key constraint. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+void checkWatermarkExists() {
+if (watermarkSpec == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any 
watermark. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+Optional getColumnPosition(SqlTableColumnPosition 
columnPosition) {
+if (columnPosition.isFirstColumn() || 
columnPosition.isAfterReferencedColumn()) {
+
sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple());
+return super.getColumnPosition(columnPosition);
+}
+return Optional.empty();
+}
+
+@Override
+Schema.UnresolvedPhysicalColumn convertPhysicalColumn(
+SqlTableColumn.SqlRegularColumn physicalColumn) {
+Schema.UnresolvedPhysicalColumn newColumn = 
super.convertPhysicalColumn(physicalColumn);
+String columnName = newColumn.getName();
+// preserves the primary key's nullability
+if (primaryKey != null && 
primaryKey.getColumnNames().contains(columnName)) {
+newColumn =
+new Schema.UnresolvedPhysicalColumn(
+columnName,
+newColumn.getDataType().notNull(),
+newColumn.getComment().orElse(null));
+}

Review Comment:
   I mean in the `SchemaConverter#convert`. BTW, do you think we need to modify 
the type to not null here. As far as I know, in the 
`SqlAlterTableSchema#validate` we have already mark used column not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


fsk119 commented on PR #21504:
URL: https://github.com/apache/flink/pull/21504#issuecomment-1366360753

   +1 to move to another task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on a diff in pull request #19329: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement

2022-12-27 Thread GitBox


fsk119 commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r1058046923


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##
@@ -144,6 +147,163 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable originTable,
+ResolvedSchema originResolveSchema) {
+Schema originSchema = originTable.getUnresolvedSchema();

Review Comment:
   The current implementation also modify the pk list if the original column is 
in the list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30515) Add benchmark for CountVectorizer, Imputer, RobustScaler, UnivariateFeatureSelector and VarianceThresholdSelector

2022-12-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30515:
---
Labels: pull-request-available  (was: )

> Add benchmark for CountVectorizer, Imputer, RobustScaler, 
> UnivariateFeatureSelector and VarianceThresholdSelector
> -
>
> Key: FLINK-30515
> URL: https://issues.apache.org/jira/browse/FLINK-30515
> Project: Flink
>  Issue Type: Bug
>Reporter: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] jiangxin369 opened a new pull request, #193: [FLINK-30515] Add benchmark for CountVectorizer, Imputer, RobustScale…

2022-12-27 Thread GitBox


jiangxin369 opened a new pull request, #193:
URL: https://github.com/apache/flink-ml/pull/193

   …r, UnivariateFeatureSelector and VarianceThresholdSelector
   
   
   
   ## What is the purpose of the change
   
   Add benchmark for CountVectorizer, Imputer, RobustScaler, 
UnivariateFeatureSelector, and VarianceThresholdSelector.
   
   ## Brief change log
   
 - Add benchmark for CountVectorizer, Imputer, RobustScaler, 
UnivariateFeatureSelector, and VarianceThresholdSelector
 - Add `RandomStringArrayGenerator` to help generate a table of string 
arrays.
 - Make `HasNumDistinctValues` a public param.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30515) Add benchmark for CountVectorizer, Imputer, RobustScaler, UnivariateFeatureSelector and VarianceThresholdSelector

2022-12-27 Thread Jiang Xin (Jira)
Jiang Xin created FLINK-30515:
-

 Summary: Add benchmark for CountVectorizer, Imputer, RobustScaler, 
UnivariateFeatureSelector and VarianceThresholdSelector
 Key: FLINK-30515
 URL: https://issues.apache.org/jira/browse/FLINK-30515
 Project: Flink
  Issue Type: Bug
Reporter: Jiang Xin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30314) Unable to read all records from compressed line-delimited JSON files using Table API

2022-12-27 Thread Jiankun Feng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652361#comment-17652361
 ] 

Jiankun Feng edited comment on FLINK-30314 at 12/28/22 4:05 AM:


Hello, everyone. This problem also occurs in 1.14.3 when reading gz files.How 
is the progress ?  Thanks

 
CREATE TABLE MyUserTable (
     column_name1 STRING
) WITH (
     'connector'='filesystem',
     'path'='xxx.gz',
     'format'='raw'
);


was (Author: JIRAUSER284851):
Hello, everyone. This problem also occurs in 1.14.3 when reading gz files.How 
is the progress ?  Thanks

> Unable to read all records from compressed line-delimited JSON files using 
> Table API
> 
>
> Key: FLINK-30314
> URL: https://issues.apache.org/jira/browse/FLINK-30314
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Connectors / FileSystem, Table SQL / API
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Dmitry Yaraev
>Priority: Major
> Attachments: input.json, input.json.gz, input.json.zip
>
>
> I am reading gzipped JSON line-delimited files in the batch mode using 
> [FileSystem 
> Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/].
>  For reading the files a new table is created with the following 
> configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
>   `my_field1` BIGINT,
>   `my_field2` INT,
>   `my_field3` VARCHAR(2147483647)
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'path-to-input-dir',
>   'format' = 'json',
>   'json.ignore-parse-errors' = 'false',
>   'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-0.json.gz and 
> input-1.json.gz. As it comes from the filenames, the files are compressed 
> with GZIP. Each of the files contains 10 records. The issue is that only 2 
> records from each file are read (4 in total). If decompressed versions of the 
> same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split 
> length, which is used when the files are read, is in fact the length of a 
> compressed file. So files are closed before all records are read from them 
> because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we 
> could identify if the file compressed or not. The flag can be set to true in 
> {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file 
> streams. With such a flag it could be possible to differentiate 
> non-splittable compressed files and only rely on the end of the stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30314) Unable to read all records from compressed line-delimited JSON files using Table API

2022-12-27 Thread Jiankun Feng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652361#comment-17652361
 ] 

Jiankun Feng commented on FLINK-30314:
--

Hello, everyone. This problem also occurs in 1.14.3 when reading gz files.How 
is the progress ?  Thanks

> Unable to read all records from compressed line-delimited JSON files using 
> Table API
> 
>
> Key: FLINK-30314
> URL: https://issues.apache.org/jira/browse/FLINK-30314
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Connectors / FileSystem, Table SQL / API
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Dmitry Yaraev
>Priority: Major
> Attachments: input.json, input.json.gz, input.json.zip
>
>
> I am reading gzipped JSON line-delimited files in the batch mode using 
> [FileSystem 
> Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/].
>  For reading the files a new table is created with the following 
> configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
>   `my_field1` BIGINT,
>   `my_field2` INT,
>   `my_field3` VARCHAR(2147483647)
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'path-to-input-dir',
>   'format' = 'json',
>   'json.ignore-parse-errors' = 'false',
>   'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-0.json.gz and 
> input-1.json.gz. As it comes from the filenames, the files are compressed 
> with GZIP. Each of the files contains 10 records. The issue is that only 2 
> records from each file are read (4 in total). If decompressed versions of the 
> same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split 
> length, which is used when the files are read, is in fact the length of a 
> compressed file. So files are closed before all records are read from them 
> because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we 
> could identify if the file compressed or not. The flag can be set to true in 
> {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file 
> streams. With such a flag it could be possible to differentiate 
> non-splittable compressed files and only rely on the end of the stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lincoln-lil commented on pull request #20800: [FLINK-28850][table-planner] Support table alias in LOOKUP hint

2022-12-27 Thread GitBox


lincoln-lil commented on PR #20800:
URL: https://github.com/apache/flink/pull/20800#issuecomment-1366346357

   @godfreyhe thanks for reviewing this! I've updated the pr according to your 
comments, the Snapshot was formatted correctly in the first commit and added 
two more cases that use table name as hint option(previously, only one case was 
kept)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


lsyldliu commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058031265


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.call;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType argsType;
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression operand;
+// TimestampToNumericCastRule can't cast timestamp to numeric 
directly, so here use
+// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead
+if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) {

Review Comment:
   I agree with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] beyond1920 commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


beyond1920 commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058030752


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.call;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType argsType;
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression operand;
+// TimestampToNumericCastRule can't cast timestamp to numeric 
directly, so here use
+// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead
+if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) {

Review Comment:
   @lsyldliu Thanks for pointing the problem out.
   IMO, it's ok to not support summing over timestamp types.
   Intuitively, sum(timestamp) does not seem to make sense. Besides the 
behavior of flink hive dialect is not easy to be consistent with hive behavior.
   WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm

2022-12-27 Thread GitBox


swuferhong commented on code in PR #21530:
URL: https://github.com/apache/flink/pull/21530#discussion_r1058027477


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkBusyJoinReorderRule.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.cost.FlinkCost;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.LoptMultiJoin;
+import org.apache.calcite.rel.rules.LoptOptimizeJoinRule;
+import org.apache.calcite.rel.rules.MultiJoin;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink busy join reorder rule, which will convert {@link MultiJoin} to a 
busy join tree.
+ *
+ * In this busy join reorder strategy, we will first try to reorder all the 
inner join type
+ * inputs in the multiJoin, and then add all outer join type inputs on the top.
+ *
+ * First, reordering all the inner join type inputs in the multiJoin. We 
adopt the concept of
+ * level in dynamic programming, and the latter layer will use the results 
stored in the previous
+ * layers. First, we put all inputs (each input in {@link MultiJoin}) into 
level 0, then we build
+ * all two-inputs join at level 1 based on the {@link FlinkCost} of level 0, 
then we will build
+ * three-inputs join based on the previous two levels, then four-inputs joins 
... etc, util we
+ * reorder all the inner join type inputs in the multiJoin. When building 
m-inputs join, we only
+ * keep the best plan (have the lowest {@link FlinkCost}) for the same set of 
m inputs. E.g., for
+ * three-inputs join, we keep only the best plan for inputs {A, B, C} among 
plans (A J B) J C, (A J
+ * C) J B, (B J C) J A.
+ *
+ * Second, we will add all outer join type inputs in the MultiJoin on the 
top.
+ */
+public class FlinkBusyJoinReorderRule extends 
RelRule
+implements TransformationRule {
+
+public static final LoptOptimizeJoinRule MULTI_JOIN_OPTIMIZE =
+LoptOptimizeJoinRule.Config.DEFAULT.toRule();
+
+/** Creates an SparkJoinReorderRule. */
+protected FlinkBusyJoinReorderRule(Config config) {
+super(config);
+}
+
+@Deprecated // to be removed before 2.0
+public FlinkBusyJoinReorderRule(RelBuilderFactory relBuilderFactory) {
+
this(Config.DEFAULT.withRelBuilderFactory(relBuilderFactory).as(Config.class));
+}
+
+@Deprecated // to be removed before 2.0
+public FlinkBusyJoinReorderRule(
+RelFactories.JoinFactory joinFactory,
+RelFactories.ProjectFactory projectFactory,
+RelFactories.FilterFactory filterFactory) {
+this(RelBuilder.proto(joinFactory, projectFactory, filterFactory));
+

[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm

2022-12-27 Thread GitBox


swuferhong commented on code in PR #21530:
URL: https://github.com/apache/flink/pull/21530#discussion_r1058025322


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkBusyJoinReorderRule.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.cost.FlinkCost;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.LoptMultiJoin;
+import org.apache.calcite.rel.rules.LoptOptimizeJoinRule;
+import org.apache.calcite.rel.rules.MultiJoin;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink busy join reorder rule, which will convert {@link MultiJoin} to a 
busy join tree.
+ *
+ * In this busy join reorder strategy, we will first try to reorder all the 
inner join type
+ * inputs in the multiJoin, and then add all outer join type inputs on the top.
+ *
+ * First, reordering all the inner join type inputs in the multiJoin. We 
adopt the concept of
+ * level in dynamic programming, and the latter layer will use the results 
stored in the previous
+ * layers. First, we put all inputs (each input in {@link MultiJoin}) into 
level 0, then we build
+ * all two-inputs join at level 1 based on the {@link FlinkCost} of level 0, 
then we will build
+ * three-inputs join based on the previous two levels, then four-inputs joins 
... etc, util we
+ * reorder all the inner join type inputs in the multiJoin. When building 
m-inputs join, we only
+ * keep the best plan (have the lowest {@link FlinkCost}) for the same set of 
m inputs. E.g., for
+ * three-inputs join, we keep only the best plan for inputs {A, B, C} among 
plans (A J B) J C, (A J
+ * C) J B, (B J C) J A.
+ *
+ * Second, we will add all outer join type inputs in the MultiJoin on the 
top.
+ */
+public class FlinkBusyJoinReorderRule extends 
RelRule
+implements TransformationRule {
+
+public static final LoptOptimizeJoinRule MULTI_JOIN_OPTIMIZE =
+LoptOptimizeJoinRule.Config.DEFAULT.toRule();
+
+/** Creates an SparkJoinReorderRule. */
+protected FlinkBusyJoinReorderRule(Config config) {
+super(config);
+}
+
+@Deprecated // to be removed before 2.0
+public FlinkBusyJoinReorderRule(RelBuilderFactory relBuilderFactory) {
+
this(Config.DEFAULT.withRelBuilderFactory(relBuilderFactory).as(Config.class));
+}
+
+@Deprecated // to be removed before 2.0
+public FlinkBusyJoinReorderRule(
+RelFactories.JoinFactory joinFactory,
+RelFactories.ProjectFactory projectFactory,
+RelFactories.FilterFactory filterFactory) {
+this(RelBuilder.proto(joinFactory, projectFactory, filterFactory));
+

[GitHub] [flink-table-store] tsreaper merged pull request #445: [FLINK-30506] Add documentation for writing Table Store with Spark3

2022-12-27 Thread GitBox


tsreaper merged PR #445:
URL: https://github.com/apache/flink-table-store/pull/445


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm

2022-12-27 Thread GitBox


swuferhong commented on code in PR #21530:
URL: https://github.com/apache/flink/pull/21530#discussion_r1058020032


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##
@@ -135,6 +135,24 @@ public class OptimizerConfigOptions {
 .defaultValue(false)
 .withDescription("Enables join reorder in optimizer. 
Default is disabled.");
 
+@Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+public static final ConfigOption 
TABLE_OPTIMIZER_BUSY_JOIN_REORDER =
+key("table.optimizer.busy-join-reorder")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Enables busy join reorder in optimizer. Default 
is disabled.");
+
+@Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+public static final ConfigOption 
TABLE_OPTIMIZER_BUSY_JOIN_REORDER_DP_THRESHOLD =
+key("table.optimizer.busy-join-reorder-dp-threshold")
+.intType()
+.defaultValue(12)

Review Comment:
   > can you explain why we choose `12` as the threshold value ?
   
   New we set the default value to -1, which means that Flink disable busy join 
reorder by default. 12 is an empirical parameter provided by the designer of 
the algorithm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm

2022-12-27 Thread GitBox


swuferhong commented on code in PR #21530:
URL: https://github.com/apache/flink/pull/21530#discussion_r1058019619


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkBusyJoinReorderRule.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.cost.FlinkCost;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.LoptMultiJoin;
+import org.apache.calcite.rel.rules.LoptOptimizeJoinRule;
+import org.apache.calcite.rel.rules.MultiJoin;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink busy join reorder rule, which will convert {@link MultiJoin} to a 
busy join tree.
+ *
+ * In this busy join reorder strategy, we will first try to reorder all the 
inner join type
+ * inputs in the multiJoin, and then add all outer join type inputs on the top.
+ *
+ * First, reordering all the inner join type inputs in the multiJoin. We 
adopt the concept of
+ * level in dynamic programming, and the latter layer will use the results 
stored in the previous
+ * layers. First, we put all inputs (each input in {@link MultiJoin}) into 
level 0, then we build
+ * all two-inputs join at level 1 based on the {@link FlinkCost} of level 0, 
then we will build
+ * three-inputs join based on the previous two levels, then four-inputs joins 
... etc, util we
+ * reorder all the inner join type inputs in the multiJoin. When building 
m-inputs join, we only
+ * keep the best plan (have the lowest {@link FlinkCost}) for the same set of 
m inputs. E.g., for
+ * three-inputs join, we keep only the best plan for inputs {A, B, C} among 
plans (A J B) J C, (A J
+ * C) J B, (B J C) J A.
+ *
+ * Second, we will add all outer join type inputs in the MultiJoin on the 
top.
+ */
+public class FlinkBusyJoinReorderRule extends 
RelRule
+implements TransformationRule {
+
+public static final LoptOptimizeJoinRule MULTI_JOIN_OPTIMIZE =
+LoptOptimizeJoinRule.Config.DEFAULT.toRule();
+
+/** Creates an SparkJoinReorderRule. */
+protected FlinkBusyJoinReorderRule(Config config) {
+super(config);
+}
+
+@Deprecated // to be removed before 2.0
+public FlinkBusyJoinReorderRule(RelBuilderFactory relBuilderFactory) {
+
this(Config.DEFAULT.withRelBuilderFactory(relBuilderFactory).as(Config.class));
+}
+
+@Deprecated // to be removed before 2.0
+public FlinkBusyJoinReorderRule(
+RelFactories.JoinFactory joinFactory,
+RelFactories.ProjectFactory projectFactory,
+RelFactories.FilterFactory filterFactory) {
+this(RelBuilder.proto(joinFactory, projectFactory, filterFactory));
+

[GitHub] [flink] swuferhong commented on a diff in pull request #21530: [FLINK-30376][table-planner] Introduce a new flink busy join reorder rule which based on greedy algorithm

2022-12-27 Thread GitBox


swuferhong commented on code in PR #21530:
URL: https://github.com/apache/flink/pull/21530#discussion_r1058019236


##
docs/layouts/shortcodes/generated/optimizer_config_configuration.html:
##
@@ -89,5 +89,17 @@
 Boolean
 When it is true, the optimizer will collect and use the 
statistics from source connectors if the source extends from 
SupportsStatisticReport and the statistics from catalog is UNKNOWN.Default 
value is true.
 
+
+table.optimizer.busy-join-reorder Batch Streaming

Review Comment:
   > This option can be removed, since we can use 
`table.optimizer.busy-join-reorder-dp-threshold` <= 0 to disable 
busy-join-reorder
   
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


lsyldliu commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058018678


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.call;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType argsType;
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression operand;
+// TimestampToNumericCastRule can't cast timestamp to numeric 
directly, so here use
+// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead
+if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) {

Review Comment:
   BTW, the hive 
[TimestampObjectInspector](https://github.com/apache/hive/blob/989e72a393356c5f91f96d1bab6455a4c75c77a7/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorUtils.java#L874)
 support return double precision, but flink  `UNIX_TIMESTAMP ` only can return 
bigint, so there may be a loss of precision here, for example `2021-08-04 
16:26:33.4`, flink only can get the part of `2021-08-04 16:26:33`, the `.4` is 
loss. For timestamp type, we cannot align the behavior with hive, what do you 
think about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


lsyldliu commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058014713


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.call;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType argsType;
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression operand;
+// TimestampToNumericCastRule can't cast timestamp to numeric 
directly, so here use
+// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead
+if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) {

Review Comment:
   
Yes,[sum](https://github.com/apache/hive/blob/10805bc997d7cd136b85fca9200cf165ffe2eae5/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java#L123)
 udaf supports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


lsyldliu commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058014428


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.call;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType argsType;
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression operand;
+// TimestampToNumericCastRule can't cast timestamp to numeric 
directly, so here use
+// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead
+if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) {
+operand = castTimestampToLong(operand(0));
+} else {
+operand = operand(0);
+}
+Expression tryCastOperand = tryCast(operand, 
typeLiteral(getResultType()));
+return new Expression[] {
+/* sum = */ ifThenElse(
+isNull(operand(0)),
+sum,
+ifThenElse(
+isNull(tryCastOperand),
+sum,
+ifThenElse(
+isNull(sum),
+tryCastOperand,
+adjustedPlus(sum, tryCastOperand
+};
+}
+
+@Override
+public Expression[] retractExpressions() {
+throw new TableException("Sum aggregate function does not support 
retraction.");
+}
+
+@Override
+public Expression[] mergeExpressions() {
+return new 

[GitHub] [flink] lsyldliu commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

2022-12-27 Thread GitBox


lsyldliu commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1058014011


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.UNIX_TIMESTAMP;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.call;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private DataType argsType;
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ nullOf(getResultType())};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+Expression operand;
+// TimestampToNumericCastRule can't cast timestamp to numeric 
directly, so here use
+// UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead
+if (argsType.getLogicalType().is(TIMESTAMP_WITHOUT_TIME_ZONE)) {
+operand = castTimestampToLong(operand(0));
+} else {
+operand = operand(0);
+}
+Expression tryCastOperand = tryCast(operand, 
typeLiteral(getResultType()));
+return new Expression[] {
+/* sum = */ ifThenElse(
+isNull(operand(0)),
+sum,
+ifThenElse(
+isNull(tryCastOperand),
+sum,
+ifThenElse(
+isNull(sum),
+tryCastOperand,
+adjustedPlus(sum, tryCastOperand
+};
+}
+
+@Override
+public Expression[] retractExpressions() {

Review Comment:
   We just follow hive logic that doesn't support retraction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

[GitHub] [flink-kubernetes-operator] nowke commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

2022-12-27 Thread GitBox


nowke commented on code in PR #487:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1057990470


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##
@@ -241,15 +245,35 @@ private Optional validateJobSpec(
 }
 
 private Optional validateJmSpec(JobManagerSpec jmSpec, Map confMap) {
+Configuration conf = Configuration.fromMap(confMap);
+var jmMemoryDefined =
+jmSpec != null
+&& jmSpec.getResource() != null
+&& 
!StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory());
+Optional jmMemoryValidation =
+jmMemoryDefined ? Optional.empty() : 
validateJmMemoryConfig(conf);
+
 if (jmSpec == null) {
-return Optional.empty();
+return jmMemoryValidation;
 }
 
 return firstPresent(
+jmMemoryValidation,
 validateResources("JobManager", jmSpec.getResource()),
 validateJmReplicas(jmSpec.getReplicas(), confMap));
 }
 
+private Optional validateJmMemoryConfig(Configuration conf) {
+try {
+
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(

Review Comment:
   - `processSpecFromConfigWithNewOptionToInterpretLegacyHeap` internally calls 
`processSpecFromConfig` - 
[JobManagerProcessUtils.java#L73-L74](https://github.com/apache/flink/blob/75a92efd7b35501698e5de253e5231d680830c16/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java#L73-L74)
   - Also, `processSpecFromConfig` is not `public`, but package private - 
[JobManagerProcessUtils.java#L82](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java#L82)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-12-27 Thread GitBox


reta commented on PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#issuecomment-1366207021

   @ypark2103 that is correct, the Flink Opensearch Connector was following the 
Flink's externalization model for connectors, the necessary scaffolding is only 
available in Flink 1.16 and above. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] ypark2103 commented on pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors

2022-12-27 Thread GitBox


ypark2103 commented on PR #1:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/1#issuecomment-1366195952

   @reta @MartijnVisser Is this Flink-Connector-Opensearch only available for 
flink 1.16 version? What about flink 1.11 or 1.12?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #8: [FLINK-14102] Introduce DB2Dialect.

2022-12-27 Thread GitBox


eskabetxe commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/8#discussion_r1057924897


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/db2/Db2TestBaseITCase.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.db2;
+
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Db2Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/** Basic class for testing DB2 jdbc. */
+public class Db2TestBaseITCase extends AbstractTestBase {

Review Comment:
   you could annotate the class with '@ Testcontainer' and the container with 
'@ Container'.. with this you could eliminate the methods startContainers and 
stopContainers as it will be auto managed by testcontainers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #8: [FLINK-14102] Introduce DB2Dialect.

2022-12-27 Thread GitBox


eskabetxe commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/8#discussion_r1057922321


##
flink-connector-jdbc/pom.xml:
##
@@ -195,13 +202,13 @@ under the License.
test

 
-   
-
+   

-   org.apache.flink

Review Comment:
   Why we are removing this?



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/db2/Db2TestBaseITCase.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.db2;
+
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Db2Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/** Basic class for testing DB2 jdbc. */
+public class Db2TestBaseITCase extends AbstractTestBase {

Review Comment:
   you could annotate the class with '@Testcontainer' and the container with 
'@Container'.. with this you could eliminate the methods startContainers and 
stopContainers as it will be auto managed by testcontainers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yuzelin commented on pull request #21525: [FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway

2022-12-27 Thread GitBox


yuzelin commented on PR #21525:
URL: https://github.com/apache/flink/pull/21525#issuecomment-1366075740

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21564: [FLINK-30514] HybridSource savepoint recovery sequence fix

2022-12-27 Thread GitBox


flinkbot commented on PR #21564:
URL: https://github.com/apache/flink/pull/21564#issuecomment-1366067992

   
   ## CI report:
   
   * b68145ccb94f51f4ef510cb2246b6c423a064347 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30514) HybridSource savepoint recovery sequence

2022-12-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30514:
---
Labels: pull-request-available  (was: )

> HybridSource savepoint recovery sequence
> 
>
> Key: FLINK-30514
> URL: https://issues.apache.org/jira/browse/FLINK-30514
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.0, 1.15.2, 1.15.3
>Reporter: Denis Golovachev
>Priority: Major
>  Labels: pull-request-available
>
> {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} 
> accumulates splits during recovery in
> {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}.
> As a next step it creates a reader and pushes all {{recoveredSplits to}} the 
> reader.
> {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}}
> Instantiation sequence of the {{setCurrentReader}} is following
>  - {{reader.start()}}
>  - {{reader.addSplits()}}
> Seems like it doesn't work if we use {{FileSourceReader}} as an underlying 
> reader.
> {{FileSourceReader#start()}} method checks if any splits are available to 
> read and executes {{sendSplitRequest}} if empty. Current 
> {{HybridSourceReader}} recovery sequence is not aligned with this.
> So, every time we recover we immediately jump to the next splits. 
> Let me show you some logs. In this experiment job should have started with 
> files inside the 100 bucket but jumped to the bucket number 200
> Job Manager
> {code:java}
> 2022-12-27 13:38:47.155 StaticFileSplitEnumerator  - Assigned split to 
> subtask 1 : FileSourceSplit: 
> s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97489087)  hosts=[localhost] ID=32 position=null
> 2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to 
> subtask 9 : FileSourceSplit: 
> s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97342071)  hosts=[localhost] ID=33 position=null
> 2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to 
> subtask 6 : FileSourceSplit: 
> s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97377047)  hosts=[localhost] ID=31 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 5 : FileSourceSplit: 
> s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97406878)  hosts=[localhost] ID=34 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 2 : FileSourceSplit: 
> s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97536205)  hosts=[localhost] ID=40 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 4 : FileSourceSplit: 
> s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97420601)  hosts=[localhost] ID=35 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 8 : FileSourceSplit: 
> s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97472256)  hosts=[localhost] ID=36 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 3 : FileSourceSplit: 
> s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97495880)  hosts=[localhost] ID=37 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 0 : FileSourceSplit: 
> s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97389425)  hosts=[localhost] ID=38 position=null
> 2022-12-27 13:38:47.158 StaticFileSplitEnumerator  - Assigned split to 
> subtask 7 : FileSourceSplit: 
> s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97428709)  hosts=[localhost] ID=39 position=null
> {code}
> Task Manager
> {code:java}
> 2246:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79887236)  hosts=[localhost] ID=18 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]
> 2247:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79987191)  hosts=[localhost] ID=11 
> position=CheckpointedPosition: offset=NO_OFFSET, 

[GitHub] [flink] WonderBeat opened a new pull request, #21564: [FLINK-30514] HybridSource savepoint recovery sequence fix

2022-12-27 Thread GitBox


WonderBeat opened a new pull request, #21564:
URL: https://github.com/apache/flink/pull/21564

   
   
   ## What is the purpose of the change
   
   This PR tweaks `HybridSourceReader` underlying reader initialization 
sequence to make it compatible with `FileSourceReader`.
   
   ## Brief change log
   
 - Underlying reader instantiation sequence changed from `start -> 
addSplits` to `addSplits -> start`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Added test that validates call ordering during initialization
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30514) HybridSource savepoint recovery sequence

2022-12-27 Thread Denis Golovachev (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Golovachev updated FLINK-30514:
-
Description: 
{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} 
accumulates splits during recovery in

{{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}.

As a next step it creates a reader and pushes all {{recoveredSplits to}} the 
reader.
{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}}

Instantiation sequence of the {{setCurrentReader}} is following
 - {{reader.start()}}
 - {{reader.addSplits()}}

Seems like it doesn't work if we use {{FileSourceReader}} as an underlying 
reader.

{{FileSourceReader#start()}} method checks if any splits are available to read 
and executes {{sendSplitRequest}} if empty. Current {{HybridSourceReader}} 
recovery sequence is not aligned with this.

So, every time we recover we immediately jump to the next splits. 
Let me show you some logs. In this experiment job should have started with 
files inside the 100 bucket but jumped to the bucket number 200
Job Manager
{code:java}
2022-12-27 13:38:47.155 StaticFileSplitEnumerator  - Assigned split to subtask 
1 : FileSourceSplit: 
s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97489087)  hosts=[localhost] ID=32 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 
9 : FileSourceSplit: 
s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97342071)  hosts=[localhost] ID=33 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 
6 : FileSourceSplit: 
s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97377047)  hosts=[localhost] ID=31 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
5 : FileSourceSplit: 
s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97406878)  hosts=[localhost] ID=34 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
2 : FileSourceSplit: 
s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97536205)  hosts=[localhost] ID=40 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
4 : FileSourceSplit: 
s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97420601)  hosts=[localhost] ID=35 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
8 : FileSourceSplit: 
s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97472256)  hosts=[localhost] ID=36 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
3 : FileSourceSplit: 
s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97495880)  hosts=[localhost] ID=37 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
0 : FileSourceSplit: 
s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97389425)  hosts=[localhost] ID=38 position=null
2022-12-27 13:38:47.158 StaticFileSplitEnumerator  - Assigned split to subtask 
7 : FileSourceSplit: 
s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97428709)  hosts=[localhost] ID=39 position=null
{code}
Task Manager
{code:java}
2246:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 79887236)  hosts=[localhost] ID=18 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226029]
2247:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 79987191)  hosts=[localhost] ID=11 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226030]
2248:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-9-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 80247830)  hosts=[localhost] ID=20 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226535]
2249:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-4-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 80055663)  hosts=[localhost] ID=15 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226712]

[jira] [Updated] (FLINK-30514) HybridSource savepoint recovery sequence

2022-12-27 Thread Denis Golovachev (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Golovachev updated FLINK-30514:
-
Description: 
{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} 
accumulates splits during recovery in

{{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}.

As a next step it creates a reader and pushes all {{recoveredSplits to}} the 
reader.
{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}}

Instantiation sequence of the {{setCurrentReader}} is following
 - {{reader.start()}}
 - {{reader.addSplits()}}

Seems like it doesn't work if we use {{FileSourceReader}} as an underlying 
reader.

{{FileSourceReader#start()}} method checks if any splits are available to read 
and executes {{sendSplitRequest}} if empty. Current {{HybridSourceReader}} 
recovery sequence is not aligned with this.

So, every time we recover we immediately jump to the next splits. 
Let me show you some logs. In this experiment job should have started with 
files inside the 100 bucket but jumped to the bucket number 200
Job Manager
{code:java}
2022-12-27 13:38:47.155 StaticFileSplitEnumerator  - Assigned split to subtask 
1 : FileSourceSplit: 
s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97489087)  hosts=[localhost] ID=32 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 
9 : FileSourceSplit: 
s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97342071)  hosts=[localhost] ID=33 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 
6 : FileSourceSplit: 
s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97377047)  hosts=[localhost] ID=31 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
5 : FileSourceSplit: 
s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97406878)  hosts=[localhost] ID=34 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
2 : FileSourceSplit: 
s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97536205)  hosts=[localhost] ID=40 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
4 : FileSourceSplit: 
s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97420601)  hosts=[localhost] ID=35 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
8 : FileSourceSplit: 
s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97472256)  hosts=[localhost] ID=36 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
3 : FileSourceSplit: 
s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97495880)  hosts=[localhost] ID=37 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
0 : FileSourceSplit: 
s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97389425)  hosts=[localhost] ID=38 position=null
2022-12-27 13:38:47.158 StaticFileSplitEnumerator  - Assigned split to subtask 
7 : FileSourceSplit: 
s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97428709)  hosts=[localhost] ID=39 position=null
{code}
Task Manager
{code:java}
2246:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 79887236)  hosts=[localhost] ID=18 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226029]
2247:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 79987191)  hosts=[localhost] ID=11 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226030]
2248:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-9-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 80247830)  hosts=[localhost] ID=20 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226535]
2249:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-4-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 80055663)  hosts=[localhost] ID=15 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226712]

[jira] [Created] (FLINK-30514) HybridSource savepoint recovery sequence

2022-12-27 Thread Denis Golovachev (Jira)
Denis Golovachev created FLINK-30514:


 Summary: HybridSource savepoint recovery sequence
 Key: FLINK-30514
 URL: https://issues.apache.org/jira/browse/FLINK-30514
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.3, 1.15.2, 1.16.0
Reporter: Denis Golovachev


{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} 
accumulates splits during recovery in

{{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}.

As a next step it creates a reader and pushes all {{recoveredSplits}} the 
reader.
{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}}

Instantiation sequence of the {{setCurrentReader}} is following
 - {{reader.start()}}
 - {{reader.addSplits()}}

Seems like it doesn't work if we use {{FileSourceReader}} as an underlying 
reader.


{{FileSourceReader#start()}} method checks if any splits are available to read 
and executes {{sendSplitRequest}} if empty. Current {{HybridSourceReader}} 
recovery sequence is not aligned with this.

So, every time we recover we immediately jump to the next splits. 
Let me show you some logs. In this experiment job should have started with 
files inside the 100 bucket but jumped to the bucket number 200
Job Manager
{code:java}
2022-12-27 13:38:47.155 StaticFileSplitEnumerator  - Assigned split to subtask 
1 : FileSourceSplit: 
s3a://bucket/200/part-1-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97489087)  hosts=[localhost] ID=32 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 
9 : FileSourceSplit: 
s3a://bucket/200/part-2-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97342071)  hosts=[localhost] ID=33 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 
6 : FileSourceSplit: 
s3a://bucket/200/part-0-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97377047)  hosts=[localhost] ID=31 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
5 : FileSourceSplit: 
s3a://bucket/200/part-3-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97406878)  hosts=[localhost] ID=34 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
2 : FileSourceSplit: 
s3a://bucket/200/part-9-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97536205)  hosts=[localhost] ID=40 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
4 : FileSourceSplit: 
s3a://bucket/200/part-4-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97420601)  hosts=[localhost] ID=35 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
8 : FileSourceSplit: 
s3a://bucket/200/part-5-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97472256)  hosts=[localhost] ID=36 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
3 : FileSourceSplit: 
s3a://bucket/200/part-6-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97495880)  hosts=[localhost] ID=37 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 
0 : FileSourceSplit: 
s3a://bucket/200/part-7-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97389425)  hosts=[localhost] ID=38 position=null
2022-12-27 13:38:47.158 StaticFileSplitEnumerator  - Assigned split to subtask 
7 : FileSourceSplit: 
s3a://bucket/200/part-8-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
 [0, 97428709)  hosts=[localhost] ID=39 position=null
{code}
Task Manager
{code:java}
2246:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-7-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 79887236)  hosts=[localhost] ID=18 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226029]
2247:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-0-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 79987191)  hosts=[localhost] ID=11 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226030]
2248:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 
s3a://bucket/100/part-9-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
 [0, 80247830)  hosts=[localhost] ID=20 position=CheckpointedPosition: 
offset=NO_OFFSET, recordsToSkip=226535]
2249:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
[FileSourceSplit: 

[GitHub] [flink] LadyForest commented on pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


LadyForest commented on PR #21504:
URL: https://github.com/apache/flink/pull/21504#issuecomment-1366041944

   > BTW, I think we should also take 
`OperationConverterUtils.convertChangeColumn` into consideration. Will you help 
to clean the code about this?
   
   Sure, I'd like to. However, it is prudent for us to put it in a separate 
task to complete. The reason is that the current impl uses the deprecated 
`CatalogTableImpl` while `SchemaConverter` uses `CatalogTable.of`. I'm not sure 
if there any compatibility issues exist. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


LadyForest commented on code in PR #21504:
URL: https://github.com/apache/flink/pull/21504#discussion_r1057779254


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -417,6 +422,92 @@ void checkColumnExists(String columnName) {
 }
 }
 
+private static class ModifySchemaConverter extends SchemaConverter {
+
+ModifySchemaConverter(
+Schema originalSchema,
+FlinkTypeFactory typeFactory,
+SqlValidator sqlValidator,
+Consumer constraintValidator,
+Function escapeExpressions,
+SchemaResolver schemaResolver) {
+super(
+originalSchema,
+typeFactory,
+sqlValidator,
+constraintValidator,
+escapeExpressions,
+schemaResolver);
+}
+
+@Override
+void checkColumnExists(String columnName) {
+if (!sortedColumnNames.contains(columnName)) {
+throw new ValidationException(
+String.format(
+"%sTry to modify a column `%s` which does not 
exist in the table.",
+EX_MSG_PREFIX, columnName));
+}
+}
+
+@Override
+void checkPrimaryKeyExists() {
+if (primaryKey == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any primary 
key constraint. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+void checkWatermarkExists() {
+if (watermarkSpec == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any 
watermark. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+Optional getColumnPosition(SqlTableColumnPosition 
columnPosition) {
+if (columnPosition.isFirstColumn() || 
columnPosition.isAfterReferencedColumn()) {
+
sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple());
+return super.getColumnPosition(columnPosition);
+}
+return Optional.empty();
+}
+
+@Override
+Schema.UnresolvedPhysicalColumn convertPhysicalColumn(
+SqlTableColumn.SqlRegularColumn physicalColumn) {
+Schema.UnresolvedPhysicalColumn newColumn = 
super.convertPhysicalColumn(physicalColumn);
+String columnName = newColumn.getName();
+// preserves the primary key's nullability
+if (primaryKey != null && 
primaryKey.getColumnNames().contains(columnName)) {
+newColumn =
+new Schema.UnresolvedPhysicalColumn(
+columnName,
+newColumn.getDataType().notNull(),
+newColumn.getComment().orElse(null));
+}

Review Comment:
   > Why not mark the column is not null when building the final schema with 
the final pk?
   
   Because under some conditions, the final pk does not get a chance to be 
updated.
   Consider the following case
   ```sql
   create table T (
 f0 int,
 f1 string primary key not enforced, -- f1 is converted to string not null 
implicitly
 f2 double
   ) with (
 ...
   );
   
   -- then change f1 pos
   alter table T modify f1 string first
   ```
   
   For this case, the schema change only contains a column modification, and 
L#103`alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);`
 does not get executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB

2022-12-27 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652279#comment-17652279
 ] 

Krzysztof Chmielewski commented on FLINK-27246:
---

Hi [~TsReaper]
Thanks for the feedback.

Regarding IfStatementRewriter its a little bit tricky for me.
I think my new thing might handle more cases than IfStatementRewriter did. Plus 
fStatementRewriter and existing rewrites seems to expect a method declaration + 
body, where my BlocksStatementGrouper and Splitter are processing individual 
block statements. They are called from FunctionSplitter::FunctionSplitVisitor 
where while processing block statements from method's body.

Now It seems that after my change,  FunctionSplitter is also rewriting the 
code, similar to IfStatementRewriter and maybe this is not the best thing to do 
from the clean code/architecture perspective.

The problem with IfStatementRewriter  is that it will not rewrite the If/Else 
branch if the branch contains "while" statement in it or when entire if/else 
statement is inside while statement, which was the original problem.

So now I'm wonder should we have this extracted from FunctionSplitter into new 
BlocksStatementRewriter that whill handle while/If/else statements in 
combination or can this be inside Function Splitter as it is now.



> Code of method 
> "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V"
>  of class "HashAggregateWithKeys$9211" grows beyond 64 KB
> -
>
> Key: FLINK-27246
> URL: https://issues.apache.org/jira/browse/FLINK-27246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3, 1.16.0, 1.15.3
>Reporter: Maciej Bryński
>Assignee: Krzysztof Chmielewski
>Priority: Major
> Attachments: endInput_falseFilter9123_split9704.txt
>
>
> I think this bug should get fixed in 
> https://issues.apache.org/jira/browse/FLINK-23007
> Unfortunately I spotted it on Flink 1.14.3
> {code}
> java.lang.RuntimeException: Could not instantiate generated class 
> 'HashAggregateWithKeys$9211'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:102)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:83)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   ... 11 more
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  

[GitHub] [flink] LadyForest commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


LadyForest commented on code in PR #21504:
URL: https://github.com/apache/flink/pull/21504#discussion_r1057779254


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -417,6 +422,92 @@ void checkColumnExists(String columnName) {
 }
 }
 
+private static class ModifySchemaConverter extends SchemaConverter {
+
+ModifySchemaConverter(
+Schema originalSchema,
+FlinkTypeFactory typeFactory,
+SqlValidator sqlValidator,
+Consumer constraintValidator,
+Function escapeExpressions,
+SchemaResolver schemaResolver) {
+super(
+originalSchema,
+typeFactory,
+sqlValidator,
+constraintValidator,
+escapeExpressions,
+schemaResolver);
+}
+
+@Override
+void checkColumnExists(String columnName) {
+if (!sortedColumnNames.contains(columnName)) {
+throw new ValidationException(
+String.format(
+"%sTry to modify a column `%s` which does not 
exist in the table.",
+EX_MSG_PREFIX, columnName));
+}
+}
+
+@Override
+void checkPrimaryKeyExists() {
+if (primaryKey == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any primary 
key constraint. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+void checkWatermarkExists() {
+if (watermarkSpec == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any 
watermark. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+Optional getColumnPosition(SqlTableColumnPosition 
columnPosition) {
+if (columnPosition.isFirstColumn() || 
columnPosition.isAfterReferencedColumn()) {
+
sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple());
+return super.getColumnPosition(columnPosition);
+}
+return Optional.empty();
+}
+
+@Override
+Schema.UnresolvedPhysicalColumn convertPhysicalColumn(
+SqlTableColumn.SqlRegularColumn physicalColumn) {
+Schema.UnresolvedPhysicalColumn newColumn = 
super.convertPhysicalColumn(physicalColumn);
+String columnName = newColumn.getName();
+// preserves the primary key's nullability
+if (primaryKey != null && 
primaryKey.getColumnNames().contains(columnName)) {
+newColumn =
+new Schema.UnresolvedPhysicalColumn(
+columnName,
+newColumn.getDataType().notNull(),
+newColumn.getComment().orElse(null));
+}

Review Comment:
   > Why not mark the column is not null when building the final schema with 
the final pk?
   
   Because under some conditions the final pk does not get a chance to be 
updated.
   Consider the following case
   ```sql
   create table T (
 f0 int,
 f1 string primary key not enforced, -- f1 is converted to string not null 
implicitly
 f2 double
   ) with (
 ...
   );
   
   -- then change f1 pos
   alter table T modify f1 string first
   ```
   
   For this case, 
L#103`alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);`
 does not get executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gyfora commented on pull request #21515: [FLINK-30429][client] Fix IllegalArgumentException when no argument in flink executable

2022-12-27 Thread GitBox


gyfora commented on PR #21515:
URL: https://github.com/apache/flink/pull/21515#issuecomment-1366000815

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30456) OLM Bundle Description Version Problems

2022-12-27 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-30456.
--
Fix Version/s: kubernetes-operator-1.4.0
   Resolution: Fixed

merged to main 0009746cb3bf96bec0450e99e03c1af20f4038e9

> OLM Bundle Description Version Problems
> ---
>
> Key: FLINK-30456
> URL: https://issues.apache.org/jira/browse/FLINK-30456
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: James Busche
>Assignee: James Busche
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.4.0
>
> Attachments: image-2022-12-20-08-21-02-597.png
>
>
> OLM is working great with OperatorHub, but noticed a few items that need 
> fixing.
> 1.  The basic.yaml example version is release-1.1 instead of the latest 
> release (release-1.3).  This needs fixing in two places:
> tools/olm/generate-olm-bundle.sh
> tools/olm/docker-entry.sh
> 2.  The label versions in the description are hardcoded to 1.2.0 instead of 
> the latest release (1.3.0)
> 3. The Provider is blank space " " but soon needs to have some text in there 
> to avoid CI errors with the latest operator-sdk versions.  The person who 
> noticed it recommended "Community" for now, but maybe we can being making it 
> "The Apache Software Foundation" now?  Not sure if we're ready for that yet 
> or not...
>  
> I'm working on a PR to address these items.  Can you assign the issue to me?  
> Thanks!
> fyi [~tedhtchang] [~gyfora] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-27 Thread GitBox


gyfora merged PR #491:
URL: https://github.com/apache/flink-kubernetes-operator/pull/491


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

2022-12-27 Thread GitBox


gyfora commented on code in PR #487:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1057748254


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##
@@ -241,15 +245,35 @@ private Optional validateJobSpec(
 }
 
 private Optional validateJmSpec(JobManagerSpec jmSpec, Map confMap) {
+Configuration conf = Configuration.fromMap(confMap);
+var jmMemoryDefined =
+jmSpec != null
+&& jmSpec.getResource() != null
+&& 
!StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory());
+Optional jmMemoryValidation =
+jmMemoryDefined ? Optional.empty() : 
validateJmMemoryConfig(conf);
+
 if (jmSpec == null) {
-return Optional.empty();
+return jmMemoryValidation;
 }
 
 return firstPresent(
+jmMemoryValidation,
 validateResources("JobManager", jmSpec.getResource()),
 validateJmReplicas(jmSpec.getReplicas(), confMap));
 }
 
+private Optional validateJmMemoryConfig(Configuration conf) {
+try {
+
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(

Review Comment:
   Why do we use this method vs using `processSpecFromConfig`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27571) Recognize "less is better" benchmarks in regression detection script

2022-12-27 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan reassigned FLINK-27571:
-

Assignee: Yanfei Lei  (was: Roman Khachatryan)

> Recognize "less is better" benchmarks in regression detection script
> 
>
> Key: FLINK-27571
> URL: https://issues.apache.org/jira/browse/FLINK-27571
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.16.0
>Reporter: Roman Khachatryan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: Screenshot_2022-05-09_10-33-11.png
>
>
> Example benchmark:
> [http://codespeed.dak8s.net:8000/timeline/#/?exe=5=schedulingDownstreamTasks.BATCH=on=on=off=2=200]
>  
> [Proposed 
> solution|https://issues.apache.org/jira/browse/FLINK-27555?focusedCommentId=17534423=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17534423]:
> {quote}
> I think #2 is the correct way.
> Maybe we can modify the save_jmh_result.py to correctly set the 'units' and 
> the 'lessisbetter' fields of benchmark results. The 'units' is already 
> contained in the jmh result and the 'lessisbetter' can be derived from the 
> mode(false if it is 'thrpt' mode, otherwise true). An example of the jmh 
> result format can be found at https://i.stack.imgur.com/vB3fV.png.
> This can fix the web UI as well as the REST result, and then the 
> regression_report.py will be able to identify which benchmarks are "less is 
> better" and treat them differently.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fsk119 commented on pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


fsk119 commented on PR #21504:
URL: https://github.com/apache/flink/pull/21504#issuecomment-1365913657

   BTW, I think we should also take 
`OperationConverterUtils.convertChangeColumn` into consideration. Will you help 
to clean the code about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30513) HA storage dir leaks on cluster termination

2022-12-27 Thread Zhanghao Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-30513:
--
Issue Type: Bug  (was: Improvement)

> HA storage dir leaks on cluster termination 
> 
>
> Key: FLINK-30513
> URL: https://issues.apache.org/jira/browse/FLINK-30513
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Zhanghao Chen
>Priority: Major
> Attachments: image-2022-12-27-21-32-17-510.png
>
>
> *Problem*
> We found that HA storage dir leaks on cluster termination for a Flink job 
> with HA enabled. The following picture shows the HA storage dir (here on 
> HDFS) of the cluster czh-flink-test-offline (of application mode) after 
> canelling the job with flink-cancel. We are left with an empty dir, and too 
> many empty dirs will greatly hurt the stability of HDFS NameNode!
> !image-2022-12-27-21-32-17-510.png|width=582,height=158!
>  
> Furthermore, in case the user choose to retain the checkpoints on job 
> termination, we will have the completedCheckpoints leaked as well. Note that 
> we no longer need the completedCheckpoints files as we'll directly recover 
> retained CPs from the CP data dir.
> *Root Cause*
> When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
> store, but didn't clean the HA storage dir.
> *Proposal*
> Clean up the HA storage dir after cleaning up blob store in 
> AbstractHaServices#closeAndCleanupAllData().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30513) HA storage dir leaks on cluster termination

2022-12-27 Thread Zhanghao Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-30513:
--
Description: 
*Problem*

We found that HA storage dir leaks on cluster termination for a Flink job with 
HA enabled. The following picture shows the HA storage dir (here on HDFS) of 
the cluster czh-flink-test-offline (of application mode) after canelling the 
job with flink-cancel. We are left with an empty dir, and too many empty dirs 
will greatly hurt the stability of HDFS NameNode!

!image-2022-12-27-21-32-17-510.png|width=582,height=158!

 

Furthermore, in case the user choose to retain the checkpoints on job 
termination, we will have the completedCheckpoints leaked as well. Note that we 
no longer need the completedCheckpoints files as we'll directly recover 
retained CPs from the CP data dir.

*Root Cause*

When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
store, but didn't clean the HA storage dir.

*Proposal*

Clean up the HA storage dir after cleaning up blob store in 
AbstractHaServices#closeAndCleanupAllData().

  was:
*Problem*

We found that HA storage dir leaks on cluster termination for a Flink job with 
HA enabled. The following picture shows the HA storage dir (here on HDFS) of 
the cluster czh-flink-test-offline (of application mode) after canelling the 
job with flink-cancel. We are left with an empty dir, and too many empty dirs 
will greatly hurt the stability of HDFS NameNode!  
!image-2022-12-27-21-32-17-510.png|width=582,height=158!

Furthermore, in case the user choose to retain the checkpoints on job 
termination, we will have the completedCheckpoints leaked as well. Note that we 
no longer need the completedCheckpoints files as we'll directly recover 
retained CPs from the CP data dir.

*Root Cause*

When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
store, but didn't clean the HA storage dir.

*Proposal*

Clean up the HA storage dir after cleaning up blob store in 
AbstractHaServices#closeAndCleanupAllData().


> HA storage dir leaks on cluster termination 
> 
>
> Key: FLINK-30513
> URL: https://issues.apache.org/jira/browse/FLINK-30513
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Zhanghao Chen
>Priority: Major
> Attachments: image-2022-12-27-21-32-17-510.png
>
>
> *Problem*
> We found that HA storage dir leaks on cluster termination for a Flink job 
> with HA enabled. The following picture shows the HA storage dir (here on 
> HDFS) of the cluster czh-flink-test-offline (of application mode) after 
> canelling the job with flink-cancel. We are left with an empty dir, and too 
> many empty dirs will greatly hurt the stability of HDFS NameNode!
> !image-2022-12-27-21-32-17-510.png|width=582,height=158!
>  
> Furthermore, in case the user choose to retain the checkpoints on job 
> termination, we will have the completedCheckpoints leaked as well. Note that 
> we no longer need the completedCheckpoints files as we'll directly recover 
> retained CPs from the CP data dir.
> *Root Cause*
> When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
> store, but didn't clean the HA storage dir.
> *Proposal*
> Clean up the HA storage dir after cleaning up blob store in 
> AbstractHaServices#closeAndCleanupAllData().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30513) HA storage dir leaks on cluster termination

2022-12-27 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-30513:
-

 Summary: HA storage dir leaks on cluster termination 
 Key: FLINK-30513
 URL: https://issues.apache.org/jira/browse/FLINK-30513
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.16.0, 1.15.0
Reporter: Zhanghao Chen
 Attachments: image-2022-12-27-21-32-17-510.png

*Problem*

We found that HA storage dir leaks on cluster termination for a Flink job with 
HA enabled. The following picture shows the HA storage dir (here on HDFS) of 
the cluster czh-flink-test-offline (of application mode) after canelling the 
job with flink-cancel. We are left with an empty dir, and too many empty dirs 
will greatly hurt the stability of HDFS NameNode!  
!image-2022-12-27-21-32-17-510.png|width=582,height=158!

Furthermore, in case the user choose to retain the checkpoints on job 
termination, we will have the completedCheckpoints leaked as well. Note that we 
no longer need the completedCheckpoints files as we'll directly recover 
retained CPs from the CP data dir.

*Root Cause*

When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
store, but didn't clean the HA storage dir.

*Proposal*

Clean up the HA storage dir after cleaning up blob store in 
AbstractHaServices#closeAndCleanupAllData().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fsk119 commented on a diff in pull request #21504: [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement

2022-12-27 Thread GitBox


fsk119 commented on code in PR #21504:
URL: https://github.com/apache/flink/pull/21504#discussion_r1057674344


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##
@@ -607,6 +607,195 @@ class TableEnvironmentTest {
 checkData(expectedResult.iterator(), tableResult.collect())
   }
 
+  @Test
+  def testAlterTableModifyColumn(): Unit = {
+val statement =

Review Comment:
   nit: These negative cases is much same in the `table.q`. I think we can keep 
one...



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##
@@ -417,6 +422,92 @@ void checkColumnExists(String columnName) {
 }
 }
 
+private static class ModifySchemaConverter extends SchemaConverter {
+
+ModifySchemaConverter(
+Schema originalSchema,
+FlinkTypeFactory typeFactory,
+SqlValidator sqlValidator,
+Consumer constraintValidator,
+Function escapeExpressions,
+SchemaResolver schemaResolver) {
+super(
+originalSchema,
+typeFactory,
+sqlValidator,
+constraintValidator,
+escapeExpressions,
+schemaResolver);
+}
+
+@Override
+void checkColumnExists(String columnName) {
+if (!sortedColumnNames.contains(columnName)) {
+throw new ValidationException(
+String.format(
+"%sTry to modify a column `%s` which does not 
exist in the table.",
+EX_MSG_PREFIX, columnName));
+}
+}
+
+@Override
+void checkPrimaryKeyExists() {
+if (primaryKey == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any primary 
key constraint. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+void checkWatermarkExists() {
+if (watermarkSpec == null) {
+throw new ValidationException(
+String.format(
+"%sThe base table does not define any 
watermark. You might "
++ "want to add a new one.",
+EX_MSG_PREFIX));
+}
+}
+
+@Override
+Optional getColumnPosition(SqlTableColumnPosition 
columnPosition) {
+if (columnPosition.isFirstColumn() || 
columnPosition.isAfterReferencedColumn()) {
+
sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple());
+return super.getColumnPosition(columnPosition);
+}
+return Optional.empty();
+}
+
+@Override
+Schema.UnresolvedPhysicalColumn convertPhysicalColumn(
+SqlTableColumn.SqlRegularColumn physicalColumn) {
+Schema.UnresolvedPhysicalColumn newColumn = 
super.convertPhysicalColumn(physicalColumn);
+String columnName = newColumn.getName();
+// preserves the primary key's nullability
+if (primaryKey != null && 
primaryKey.getColumnNames().contains(columnName)) {
+newColumn =
+new Schema.UnresolvedPhysicalColumn(
+columnName,
+newColumn.getDataType().notNull(),
+newColumn.getComment().orElse(null));
+}

Review Comment:
   Why not mark the column is not null when building the final schema with the 
final pk?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window

2022-12-27 Thread GitBox


gyfora commented on code in PR #493:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057676553


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##
@@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory(
 
 // Add scaling metrics to history if they were computed successfully
 scalingMetricHistory.put(clock.instant(), scalingMetrics);
-scalingInformation.updateMetricHistory(currentJobStartTs, 
scalingMetricHistory);
+scalingInformation.updateMetricHistory(currentJobUpdateTs, 
scalingMetricHistory);
+
+if (currentJobUpdateTs
+.plus(stabilizationDuration)
+.isAfter(clock.instant().minus(metricsWindowDuration))) {

Review Comment:
   I think in this early state it’s better to have flexibility with the 
configuration and document recommendations than to restrict the behavior. We 
still have time to simplify this in the future as we gather working knowledge 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2022-12-27 Thread GitBox


luoyuxia commented on PR #21257:
URL: https://github.com/apache/flink/pull/21257#issuecomment-1365896752

   @beyond1920 Thanks for reviewing. I prefer to append a commit with tests to 
mock  the speculative execution to validate these operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window

2022-12-27 Thread GitBox


gyfora commented on code in PR #493:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057664132


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##
@@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory(
 
 // Add scaling metrics to history if they were computed successfully
 scalingMetricHistory.put(clock.instant(), scalingMetrics);
-scalingInformation.updateMetricHistory(currentJobStartTs, 
scalingMetricHistory);
+scalingInformation.updateMetricHistory(currentJobUpdateTs, 
scalingMetricHistory);
+
+if (currentJobUpdateTs
+.plus(stabilizationDuration)
+.isAfter(clock.instant().minus(metricsWindowDuration))) {

Review Comment:
   At any point the data rate might suddenly increase (during any scale 
operation) the minimum window allows us to react to it. This is not only about 
the first time we start the job 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window

2022-12-27 Thread GitBox


gyfora commented on code in PR #493:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057663391


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##
@@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory(
 
 // Add scaling metrics to history if they were computed successfully
 scalingMetricHistory.put(clock.instant(), scalingMetrics);
-scalingInformation.updateMetricHistory(currentJobStartTs, 
scalingMetricHistory);
+scalingInformation.updateMetricHistory(currentJobUpdateTs, 
scalingMetricHistory);
+
+if (currentJobUpdateTs
+.plus(stabilizationDuration)
+.isAfter(clock.instant().minus(metricsWindowDuration))) {

Review Comment:
   I think the min/max is actually simpler to understand and configure . If 
oscillation occurs then your stabilization period may be too small?
   
   the default for min/max could be the same duration but I would not make this 
more complicated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window

2022-12-27 Thread GitBox


mxm commented on code in PR #493:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057660487


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##
@@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory(
 
 // Add scaling metrics to history if they were computed successfully
 scalingMetricHistory.put(clock.instant(), scalingMetrics);
-scalingInformation.updateMetricHistory(currentJobStartTs, 
scalingMetricHistory);
+scalingInformation.updateMetricHistory(currentJobUpdateTs, 
scalingMetricHistory);
+
+if (currentJobUpdateTs
+.plus(stabilizationDuration)
+.isAfter(clock.instant().minus(metricsWindowDuration))) {

Review Comment:
   After thinking about this change more, it might actually be tricky for users 
to set the minimum metric window size correctly. There could be unexpected side 
effects if the delta between the two is too high, e.g. oscillation where the 
minimum window scales up, the maximum window scales down, and so on. It seems 
we only want to permit the minimum window for the initial scaling but not for 
subsequent scaling decisions. We could keep `metrics.window` and introduce 
`metrics.window.initial` which is only respected as long as the last scaling 
decision didn't come from the regular window.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] beyond1920 commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2022-12-27 Thread GitBox


beyond1920 commented on code in PR #21257:
URL: https://github.com/apache/flink/pull/21257#discussion_r1057658616


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/CompactFileUtils.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.stream.compact.CompactContext;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for compacting files. */
+public class CompactFileUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CompactFileUtils.class);
+
+public static final String UNCOMPACTED_PREFIX = ".uncompacted-";
+
+public static final String COMPACTED_PREFIX = "compacted-";
+
+/** The function interface for move single file. */
+@FunctionalInterface
+public interface SingleFileMvFunction {
+R apply(T t, U u) throws IOException;
+}
+
+/**
+ * Do Compaction: - Target file exists, do nothing. - Can do compaction: - 
Single file, do
+ * atomic renaming, there are optimizations for FileSystem. - Multiple 
file, do reading and
+ * writing.
+ */
+public static  Path doCompact(
+FileSystem fileSystem,
+String partition,
+List paths,
+Configuration config,
+SingleFileMvFunction singleFileMvFunc,
+CompactReader.Factory readerFactory,
+CompactWriter.Factory writerFactory)
+throws IOException {
+if (paths.size() == 0) {
+return null;
+}
+
+Map inputMap = new HashMap<>();
+for (Path path : paths) {
+inputMap.put(path, fileSystem.getFileStatus(path).getLen());
+}
+
+Path target = createCompactedFile(paths);
+if (fileSystem.exists(target)) {
+return target;

Review Comment:
   I'm not sure if this behavior is correct if enable speculative execution.
   If one compact operator is slow, the scheduler might deploy a new attempt 
for it.
   The compact operator in new attempt would be return directly here because 
the target path is already exists. But it does not do anything yet.



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/CompactFileUtils.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.stream.compact.CompactContext;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 

[jira] [Closed] (FLINK-30494) Introduce TableChange to represents SET/RESET change

2022-12-27 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-30494.
-
Resolution: Implemented

Merged into master: 75a92efd7b35501698e5de253e5231d680830c16

> Introduce TableChange to represents SET/RESET change
> 
>
> Key: FLINK-30494
> URL: https://issues.apache.org/jira/browse/FLINK-30494
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.17.0
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fsk119 merged pull request #21554: [FLINK-30493][table-api] Introduce TableChange to SET/RESET options

2022-12-27 Thread GitBox


fsk119 merged PR #21554:
URL: https://github.com/apache/flink/pull/21554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] beyond1920 commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2022-12-27 Thread GitBox


beyond1920 commented on code in PR #21257:
URL: https://github.com/apache/flink/pull/21257#discussion_r1057645105


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.OutputFormatFactory;
+import org.apache.flink.connector.file.table.PartitionComputer;
+import org.apache.flink.connector.file.table.PartitionTempFileManager;
+import org.apache.flink.connector.file.table.PartitionWriter;
+import org.apache.flink.connector.file.table.PartitionWriterFactory;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableException;
+
+import java.util.LinkedHashMap;
+
+/**
+ * An operator for writing files in batch mode. Once creating a new file to 
write, the writing
+ * operator will emit the written file to upstream.
+ */
+public class BatchFileWriter extends 
AbstractStreamOperator
+implements OneInputStreamOperator, 
BoundedOneInput {
+
+private final Path tmpPath;
+private final String[] partitionColumns;
+private final boolean dynamicGrouped;
+private final LinkedHashMap staticPartitions;
+private final PartitionComputer computer;
+private final OutputFormatFactory formatFactory;
+private final OutputFileConfig outputFileConfig;
+
+private transient PartitionWriter writer;
+
+public BatchFileWriter(
+Path tmpPath,
+String[] partitionColumns,
+boolean dynamicGrouped,
+LinkedHashMap staticPartitions,
+OutputFormatFactory formatFactory,
+PartitionComputer computer,
+OutputFileConfig outputFileConfig) {
+this.tmpPath = tmpPath;
+this.partitionColumns = partitionColumns;
+this.dynamicGrouped = dynamicGrouped;
+this.staticPartitions = staticPartitions;
+this.formatFactory = formatFactory;
+this.computer = computer;
+this.outputFileConfig = outputFileConfig;
+}
+
+@Override
+public void open() throws Exception {
+try {
+PartitionTempFileManager fileManager =
+new PartitionTempFileManager(
+tmpPath,
+getRuntimeContext().getIndexOfThisSubtask(),
+getRuntimeContext().getAttemptNumber(),
+outputFileConfig);

Review Comment:
   It's not safe if the job enables speculative execution because multiple 
attempts would write to the same file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30512) Flink SQL state TTL has no effect when using Interval Join

2022-12-27 Thread wangkang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangkang updated FLINK-30512:
-
Description: 
Take the following join SQL program as an example:
{code:java}
SET 'table.exec.state.ttl' = '90 ms';
select 
...
from kafka_source_dwdexpose as t1
left join kafka_source_expose_attr_click t3 
ON t1.mid = t3.mid and t1.sr = t3.sr 
and t1.time_local = t3.time_local 
and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE  AND t3.log_ltz + 
INTERVAL '2' MINUTE {code}
!flink1.16.png|width=906,height=278!

the state size is getting bigger and bigger.

we also test the same sql with flink sql 1.13,the state size is stable.

  was:
Take the following join SQL program as an example:
{code:java}
SET 'table.exec.state.ttl' = '90 ms';
select 
...
from kafka_source_dwdexpose as t1
left join kafka_source_expose_attr_click t3 
ON t1.mid = t3.mid and t1.sr = t3.sr 
and t1.time_local = t3.time_local 
and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE  AND t3.log_ltz + 
INTERVAL '2' MINUTE {code}
!flink1.16.png!

the state size is getting bigger and bigger.

we also test the same sql with flink sql 1.13,the state size is stable.


> Flink SQL state TTL has no effect when using Interval Join
> --
>
> Key: FLINK-30512
> URL: https://issues.apache.org/jira/browse/FLINK-30512
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.15.1, 1.16.1
>Reporter: wangkang
>Priority: Major
> Attachments: flink1.16.png
>
>
> Take the following join SQL program as an example:
> {code:java}
> SET 'table.exec.state.ttl' = '90 ms';
> select 
> ...
> from kafka_source_dwdexpose as t1
> left join kafka_source_expose_attr_click t3 
> ON t1.mid = t3.mid and t1.sr = t3.sr 
> and t1.time_local = t3.time_local 
> and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE  AND t3.log_ltz + 
> INTERVAL '2' MINUTE {code}
> !flink1.16.png|width=906,height=278!
> the state size is getting bigger and bigger.
> we also test the same sql with flink sql 1.13,the state size is stable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30512) Flink SQL state TTL has no effect when using Interval Join

2022-12-27 Thread wangkang (Jira)
wangkang created FLINK-30512:


 Summary: Flink SQL state TTL has no effect when using Interval Join
 Key: FLINK-30512
 URL: https://issues.apache.org/jira/browse/FLINK-30512
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.1, 1.16.1
Reporter: wangkang
 Attachments: flink1.16.png

Take the following join SQL program as an example:
{code:java}
SET 'table.exec.state.ttl' = '90 ms';
select 
...
from kafka_source_dwdexpose as t1
left join kafka_source_expose_attr_click t3 
ON t1.mid = t3.mid and t1.sr = t3.sr 
and t1.time_local = t3.time_local 
and t1.log_ltz BETWEEN t3.log_ltz - INTERVAL '2' MINUTE  AND t3.log_ltz + 
INTERVAL '2' MINUTE {code}
!flink1.16.png!

the state size is getting bigger and bigger.

we also test the same sql with flink sql 1.13,the state size is stable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25421) Port JDBC Sink to new Unified Sink API (FLIP-143)

2022-12-27 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652232#comment-17652232
 ] 

RocMarshal commented on FLINK-25421:


Get it. Thanks a lot~ [~eskabetxe] 

> Port JDBC Sink to new Unified Sink API (FLIP-143)
> -
>
> Key: FLINK-25421
> URL: https://issues.apache.org/jira/browse/FLINK-25421
> Project: Flink
>  Issue Type: Improvement
>Reporter: Martijn Visser
>Priority: Major
>
> The current JDBC connector is using the old SinkFunction interface, which is 
> going to be deprecated. We should port/refactor the JDBC Sink to use the new 
> Unified Sink API, based on FLIP-143 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.

2022-12-27 Thread RocMarshal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RocMarshal updated FLINK-30511:
---
Description: 
* Code segment:

{code:java}
public class OnTimerDemo {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("taskmanager.numberOfTaskSlots", "4");
conf.setString("state.checkpoint-storage", "filesystem");
conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
conf.setString("execution.checkpointing.interval", "30s");

//conf.setString("execution.savepoint.path", 
"file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(1);

EnvironmentSettings envSetting = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
envSetting);

String sourceDDL = "CREATE TABLE orders (\n" +
"  id   INT,\n" +
"  app  INT,\n" +
"  user_id  STRING" +
") WITH (\n" +
"   'connector' = 'datagen',\n" +
"   'rows-per-second'='1',\n" +
"   'fields.app.min'='1',\n" +
"   'fields.app.max'='10',\n" +
"   'fields.user_id.length'='10'\n" +
")";

tableEnv.executeSql(sourceDDL);

Table query = tableEnv.sqlQuery("select * from orders");
DataStream rowDataStream = tableEnv.toAppendStream(query, 
Row.class);

TypeInformation[] returnTypes = new TypeInformation[4];
returnTypes[0] = Types.INT;

returnTypes[1] = Types.INT; // Anchor-B:

returnTypes[2] = Types.INT;
returnTypes[3] = Types.INT;


rowDataStream.keyBy(new KeySelector() {
@Override
public String getKey(Row value) throws Exception {
return value.getFieldAs(2);
}
}).process(new KeyedProcessFunction() {

private Row firstRow;

@Override
public void processElement(Row value, Context ctx, 
Collector out) throws Exception {
if (firstRow == null) {
firstRow = value;
}

ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
3000);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
Row colRow = new Row(4);
colRow.setField(0, 0);
colRow.setField(1, 1);
colRow.setField(2, 2);
colRow.setField(3, 3);

out.collect(colRow); // Anchor-C

}
}).name("TargetTestUDF")
.returns(new RowTypeInfo(returnTypes))
.print();

env.execute(OnTimerDemo.class.getSimpleName());
}

}
 {code}
 * Recurrence steps
 ** Run the job without state.
 ** Collect the latest available checkpoint path as 'checkpoint-path-a'
 ** Stop the job.
 ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
un-comment the line.
 ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 
'Anchor-B' line.
 ** Then add break-point at 'StreamTask#handleAsyncException' method.
 ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be 
cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at  
'StreamTask#handleAsyncException' method.
 ** So, The framework can't catch the same exception in the case.
 * Root cause:
 ** !截屏2022-12-27 18.51.12.png!
 ** When job started from state data, the Task#restoreAndInvoke would be 
called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot 
be cast to java.lang.Long' was ignored at the above 'handleAsyncException' 
method instead of catching at catch-block of 'Task#restoreAndInvoke'.

                       !截屏2022-12-27 19.20.00.png!

Could it be seen as the framework's missing handling of exceptions? 
If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', 
which is suitable for the intention of the 'Task#restoreAndInvoke'.

Thank u.
 
 
 

 

  was:
* Code segment:

{code:java}
public class OnTimerDemo {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("taskmanager.numberOfTaskSlots", "4");

[jira] [Created] (FLINK-30511) Ignore the Exception in user-timer Triggerble when recover form state.

2022-12-27 Thread RocMarshal (Jira)
RocMarshal created FLINK-30511:
--

 Summary: Ignore the Exception in user-timer Triggerble when 
recover form state.
 Key: FLINK-30511
 URL: https://issues.apache.org/jira/browse/FLINK-30511
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.16.0
 Environment: Flink 1.16.0

java8

deployment Mode: miniCluster in IDC; standalone, yarn-application.
Reporter: RocMarshal
 Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png

* Code segment:

{code:java}
public class OnTimerDemo {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("taskmanager.numberOfTaskSlots", "4");
conf.setString("state.checkpoint-storage", "filesystem");
conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
conf.setString("execution.checkpointing.interval", "30s");

//conf.setString("execution.savepoint.path", 
"file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(1);

EnvironmentSettings envSetting = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
envSetting);

String sourceDDL = "CREATE TABLE orders (\n" +
"  id   INT,\n" +
"  app  INT,\n" +
"  user_id  STRING" +
") WITH (\n" +
"   'connector' = 'datagen',\n" +
"   'rows-per-second'='1',\n" +
"   'fields.app.min'='1',\n" +
"   'fields.app.max'='10',\n" +
"   'fields.user_id.length'='10'\n" +
")";

tableEnv.executeSql(sourceDDL);

Table query = tableEnv.sqlQuery("select * from orders");
DataStream rowDataStream = tableEnv.toAppendStream(query, 
Row.class);

TypeInformation[] returnTypes = new TypeInformation[4];
returnTypes[0] = Types.INT;

returnTypes[1] = Types.INT; // Anchor-B:

returnTypes[2] = Types.INT;
returnTypes[3] = Types.INT;


rowDataStream.keyBy(new KeySelector() {
@Override
public String getKey(Row value) throws Exception {
return value.getFieldAs(2);
}
}).process(new KeyedProcessFunction() {

private Row firstRow;

@Override
public void processElement(Row value, Context ctx, 
Collector out) throws Exception {
if (firstRow == null) {
firstRow = value;
}

ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
3000);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
Row colRow = new Row(4);
colRow.setField(0, 0);
colRow.setField(1, 1);
colRow.setField(2, 2);
colRow.setField(3, 3);

out.collect(colRow); // Anchor-C

}
}).name("TargetTestUDF")
.returns(new RowTypeInfo(returnTypes))
.print();

env.execute(OnTimerDemo.class.getSimpleName());
}

}
 {code}
 * Recurrence steps
 ** Run the job without state.
 ** Collect the latest available checkpoint path as 'checkpoint-path-a'
 ** Stop the job.
 ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
un-comment the line.
 ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 
'Anchor-B' line.
 ** Then add break-point at 'StreamTask#handleAsyncException' method.
 ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be 
cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at  
'StreamTask#handleAsyncException' method.
 ** So, The framework can't catch the same exception in the case.
 * Root cause:
 ** !截屏2022-12-27 18.51.12.png!
 ** When job started from state data, the Task#restoreAndInvoke would be 
called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot 
be cast to java.lang.Long' was ignored at the above 'handleAsyncException' 
method instead of catching at catch-block of 'Task#restoreAndInvoke'.

                      !截屏2022-12-27 19.20.00.png!

Could it be set as the framework's missing handling of exceptions? 
If so, I prefer to re-throw the 

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window

2022-12-27 Thread GitBox


gyfora commented on code in PR #493:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057607945


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##
@@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory(
 
 // Add scaling metrics to history if they were computed successfully
 scalingMetricHistory.put(clock.instant(), scalingMetrics);
-scalingInformation.updateMetricHistory(currentJobStartTs, 
scalingMetricHistory);
+scalingInformation.updateMetricHistory(currentJobUpdateTs, 
scalingMetricHistory);
+
+if (currentJobUpdateTs
+.plus(stabilizationDuration)
+.isAfter(clock.instant().minus(metricsWindowDuration))) {

Review Comment:
   Looks good! Please also update the autoscaler doc page section about the 
improved configuration 
(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#stabilization-and-metrics-collection-intervals)
 and the example configs in docs and yamls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of columnar format

2022-12-27 Thread GitBox


flinkbot commented on PR #21563:
URL: https://github.com/apache/flink/pull/21563#issuecomment-1365806084

   
   ## CI report:
   
   * 0f3cbc42fcab5f2f351290cae304ed6ea2cca78d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-19889) Supports nested projection pushdown for filesystem connector of columnar format

2022-12-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19889:
---
Labels: pull-request-available  (was: )

> Supports nested projection pushdown for filesystem connector of columnar 
> format
> ---
>
> Key: FLINK-19889
> URL: https://issues.apache.org/jira/browse/FLINK-19889
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem, Table SQL / Ecosystem
>Reporter: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>
> Once FLINK-19639 is supported, we can support nested projection pushdown for 
> filesystem connector, but only for columnar format first. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] LadyForest commented on a diff in pull request #19329: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement

2022-12-27 Thread GitBox


LadyForest commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r1057594885


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##
@@ -144,6 +147,163 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable originTable,
+ResolvedSchema originResolveSchema) {
+Schema originSchema = originTable.getUnresolvedSchema();

Review Comment:
   Can we reuse `SchemaResolver` to simplify this check just like ALTER TABLE 
ADD?
   ```java
   Schema originSchema = originTable.getUnresolvedSchema();
   List newColumns =
   originSchema.getColumns().stream()
   .map(
   column -> {
   if 
(column.getName().equals(originColumnName)) {
   return convertColumn(column, 
newColumnName);
   } else {
   return column;
   }
   })
   .collect(Collectors.toList());
   
   Schema updatedSchema =
   
Schema.newBuilder().fromSchema(originSchema).replaceColumns(newColumns).build();
   // try {
   //schemaResolver.resolve(updatedSchema);
   //return updatedSchema;
   //} catch (Exception e) {
   //throw new ValidationException(
   //String.format("%s%s", EX_MSG_PREFIX, 
e.getMessage()), e);
   //}
   return new AlterTableSchemaOperation(
   tableIdentifier,
   CatalogTable.of(
   updatedSchema,
   originTable.getComment(),
   originTable.getPartitionKeys(),
   originTable.getOptions()));
   ```



##
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##
@@ -608,6 +612,18 @@ SqlAlterTable SqlAlterTable() :
 tableIdentifier,
 newTableIdentifier);
 }
+|
+
+originColumnIdentifier = SimpleIdentifier()

Review Comment:
   Do we plan to support the nested column at the syntax level, just like 
`ALTER TABLE ADD/MODIFY` did?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##
@@ -1258,6 +1259,119 @@ public void testAlterTable() throws Exception {
 .hasMessageContaining("ALTER TABLE RESET does not support 
empty key");
 }
 
+@Test
+public void testAlterTableRenameColumn() throws Exception {

Review Comment:
   Can we reuse `prepareTable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yuchuanchen opened a new pull request, #21563: [FLINK-19889][connectors/hive/filesystem][format/parquet] Supports nested projection pushdown for filesystem connector of columnar forma

2022-12-27 Thread GitBox


yuchuanchen opened a new pull request, #21563:
URL: https://github.com/apache/flink/pull/21563

   …sted projection pushdown for filesystem connector of columnar format
   
   
   
   ## What is the purpose of the change
   Supports nested projection pushdown for filesystem connector of columnar 
format
   
   ## Brief change log
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   HiveTableSourceITCase.testReadParquetNestedPushdown()
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #493: [FLINK-30464] Exclude metrics during stabilization and ensure a full metric window

2022-12-27 Thread GitBox


mxm commented on code in PR #493:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/493#discussion_r1057588877


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##
@@ -122,7 +130,15 @@ public CollectedMetrics getMetricsHistory(
 
 // Add scaling metrics to history if they were computed successfully
 scalingMetricHistory.put(clock.instant(), scalingMetrics);
-scalingInformation.updateMetricHistory(currentJobStartTs, 
scalingMetricHistory);
+scalingInformation.updateMetricHistory(currentJobUpdateTs, 
scalingMetricHistory);
+
+if (currentJobUpdateTs
+.plus(stabilizationDuration)
+.isAfter(clock.instant().minus(metricsWindowDuration))) {

Review Comment:
   Created [FLINK-30510](https://issues.apache.org/jira/browse/FLINK-30510) and 
tagged the commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29950) Refactor ResultSet to an interface

2022-12-27 Thread yuzelin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzelin updated FLINK-29950:

Description: 
Currently, the 'ResultSet' only contains Schema and data for execution result, 
witch is not enough for Client to display necessary information. So we need to 
add more fields to improve 'ResultSet'.

We can refactor the ResultSet to an interface and hide the detail of 
implementation of ResultSet, such as how to Ser/De the data in ResultSet.

  was:
Currently, the 'ResultSet' only contains Schema and data for execution result, 
witch is not enough for Client to display necessary information. So we need to 
add more fields to improve 'ResultSet'.

We can refactor the ResultSet to an interface and hide the detail of 
implementation of ResultSet.


> Refactor ResultSet to an interface
> --
>
> Key: FLINK-29950
> URL: https://issues.apache.org/jira/browse/FLINK-29950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: yuzelin
>Assignee: yuzelin
>Priority: Major
>
> Currently, the 'ResultSet' only contains Schema and data for execution 
> result, witch is not enough for Client to display necessary information. So 
> we need to add more fields to improve 'ResultSet'.
> We can refactor the ResultSet to an interface and hide the detail of 
> implementation of ResultSet, such as how to Ser/De the data in ResultSet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30510) Allow configuring minimum and maximum metrics window size

2022-12-27 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-30510:
--

 Summary: Allow configuring minimum and maximum metrics window size
 Key: FLINK-30510
 URL: https://issues.apache.org/jira/browse/FLINK-30510
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.4.0


It would be more flexible to have a minimum and maximum window size for metric 
collection instead of a fixed window size. This would allow for faster metrics 
evaluation after a rescale operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29950) Refactor ResultSet to an interface

2022-12-27 Thread yuzelin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzelin updated FLINK-29950:

Description: 
Currently, the 'ResultSet' only contains Schema and data for execution result, 
witch is not enough for Client to display necessary information. So we need to 
add more fields to improve 'ResultSet'.

We can refactor the ResultSet to an interface and hide the detail of 
implementation of ResultSet.

  was:Currently, the 'ResultSet' only contains Schema and data for execution 
result, witch is not enough for Client to display necessary information. So we 
need to add more fields to improve 'ResultSet'.


> Refactor ResultSet to an interface
> --
>
> Key: FLINK-29950
> URL: https://issues.apache.org/jira/browse/FLINK-29950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: yuzelin
>Assignee: yuzelin
>Priority: Major
>
> Currently, the 'ResultSet' only contains Schema and data for execution 
> result, witch is not enough for Client to display necessary information. So 
> we need to add more fields to improve 'ResultSet'.
> We can refactor the ResultSet to an interface and hide the detail of 
> implementation of ResultSet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29950) Refactor ResultSet to a interface

2022-12-27 Thread yuzelin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzelin updated FLINK-29950:

Summary: Refactor ResultSet to a interface  (was: ResultSet Refactor)

> Refactor ResultSet to a interface
> -
>
> Key: FLINK-29950
> URL: https://issues.apache.org/jira/browse/FLINK-29950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: yuzelin
>Assignee: yuzelin
>Priority: Major
>
> Currently, the 'ResultSet' only contains Schema and data for execution 
> result, witch is not enough for Client to display necessary information. So 
> we need to add more fields to improve 'ResultSet'.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29950) Refactor ResultSet to an interface

2022-12-27 Thread yuzelin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuzelin updated FLINK-29950:

Summary: Refactor ResultSet to an interface  (was: Refactor ResultSet to a 
interface)

> Refactor ResultSet to an interface
> --
>
> Key: FLINK-29950
> URL: https://issues.apache.org/jira/browse/FLINK-29950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: yuzelin
>Assignee: yuzelin
>Priority: Major
>
> Currently, the 'ResultSet' only contains Schema and data for execution 
> result, witch is not enough for Client to display necessary information. So 
> we need to add more fields to improve 'ResultSet'.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser commented on pull request #21542: [FLINK-27640][Connector/Hive] Exclude Pentaho dependency from Hive

2022-12-27 Thread GitBox


MartijnVisser commented on PR #21542:
URL: https://github.com/apache/flink/pull/21542#issuecomment-1365743830

   > I'm back. I have verified with Hive 2.3.9 & Hive 3.1.3, it still works 
after we exclude pentaho dependency.
   
   Thanks for checking! 
   
   > `mvn dependency:tree` tells that there is another dependency on 
`org.pentaho:pentaho-aggdesigner-algorithm` in `flink-sql-client`
   > shouldn't it also be excluded there?
   
   Yes we should. I'll update the PR to include that, thanks for double 
checking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21562: [FLINK-30509] Modify sql-client.sh

2022-12-27 Thread GitBox


flinkbot commented on PR #21562:
URL: https://github.com/apache/flink/pull/21562#issuecomment-1365742842

   
   ## CI report:
   
   * a0fbb989f0315cf2c0e757f0e53f17b390359a2e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30509) Modify sql-client.sh

2022-12-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30509:
---
Labels: pull-request-available  (was: )

> Modify sql-client.sh
> 
>
> Key: FLINK-30509
> URL: https://issues.apache.org/jira/browse/FLINK-30509
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: yuzelin
>Priority: Major
>  Labels: pull-request-available
>
> New design of SQL client will depend on the sql-gateway module. So add the 
> jar to the jar path in starting script.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuzelin opened a new pull request, #21562: [FLINK-30509] Modify sql-client.sh

2022-12-27 Thread GitBox


yuzelin opened a new pull request, #21562:
URL: https://github.com/apache/flink/pull/21562

   
   
   ## What is the purpose of the change
   New design of SQL client will depend on the sql-gateway module. So add the 
jar to the jar path in starting script.
   
   
   ## Brief change log
   
   *(for example:)*
 - Modify `sql-client.sh`.
 - Modify pom.
   
   
   ## Verifying this change
   
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30509) Modify sql-client.sh

2022-12-27 Thread yuzelin (Jira)
yuzelin created FLINK-30509:
---

 Summary: Modify sql-client.sh
 Key: FLINK-30509
 URL: https://issues.apache.org/jira/browse/FLINK-30509
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Affects Versions: 1.17.0
Reporter: yuzelin


New design of SQL client will depend on the sql-gateway module. So add the jar 
to the jar path in starting script.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30368) Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected

2022-12-27 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30368.
--
Resolution: Fixed

Fixed in master: dc862dae28a172f674a9b8a2198c603275304550

> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected 
> ---
>
> Key: FLINK-30368
> URL: https://issues.apache.org/jira/browse/FLINK-30368
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected. This wrong zero value 
> will affect the selection of join type。 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30368) Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected

2022-12-27 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30368:
--

Assignee: Yunhong Zheng

> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected 
> ---
>
> Key: FLINK-30368
> URL: https://issues.apache.org/jira/browse/FLINK-30368
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected. This wrong zero value 
> will affect the selection of join type。 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] godfreyhe closed pull request #21490: [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than n

2022-12-27 Thread GitBox


godfreyhe closed pull request #21490: [FLINK-30368][table-planner] Fix calcite 
method RelMdUtil$numDistinctVals() wrongly return zero if the method input 
domainSize much larger than numSelected
URL: https://github.com/apache/flink/pull/21490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30505) Close the connection between TM and JM when task executor failed

2022-12-27 Thread Yongming Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652188#comment-17652188
 ] 

Yongming Zhang commented on FLINK-30505:


[~xtsong] Thanks for participating in this discussion

> Close the connection between TM and JM when task executor failed
> 
>
> Key: FLINK-30505
> URL: https://issues.apache.org/jira/browse/FLINK-30505
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.16.0
>Reporter: Yongming Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> When resource manager detects a task executor has failed, it will close 
> connection with task executor. At this time,jobs running on this tm will fail 
> for other reasons(no longger reachable or heartbeat timeout).
> !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672047809511-a4b8b5d9-f11f-483c-a113-b42290a33250.png|width=1160,id=uc24b1166!
> If close the connection between task executor and job master when resource 
> manager detects a task executor has failed,the real reason for task executor 
> failure will appear in "Root Exception".This will make it easier for users to 
> find problems.
> !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672048733572-2b5b7be4-087d-46ae-9c8d-6ad5a1344019.png|width=1141,id=u947d8c4e!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuzelin commented on a diff in pull request #21525: [FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway

2022-12-27 Thread GitBox


yuzelin commented on code in PR #21525:
URL: https://github.com/apache/flink/pull/21525#discussion_r1057502548


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java:
##
@@ -58,154 +58,139 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
 import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestClientAndEndpointUtils.TestRestClient.getTestRestClient;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT cases for {@link SqlGatewayRestEndpoint}. */
 class SqlGatewayRestEndpointITCase {
 
-private static final SqlGatewayService service = null;
-
-private static RestServerEndpoint serverEndpoint;
-private static RestClient restClient;
+private static SqlGatewayRestEndpoint serverEndpoint;
+private static TestRestClient restClient;
 private static InetSocketAddress serverAddress;
 
-private static TestBadCaseHandler testHandler;
-private static TestVersionSelectionHeaders1 header1;
-private static TestVersionSelectionHeaders2 header2;
 private static TestBadCaseHeaders badCaseHeader;
-private static TestVersionHandler testVersionHandler1;
-private static TestVersionHandler testVersionHandler2;
+private static TestBadCaseHandler testHandler;
+
+private static TestVersionSelectionHeaders0 header0;
+private static TestVersionSelectionHeaders12 header12;
+
+private static TestVersionHandler testVersionHandler0;
+private static TestVersionHandler testVersionHandler12;
 
 private static Configuration config;
 private static final Time timeout = Time.seconds(10L);
 
 @BeforeEach
 void setup() throws Exception {
 // Test version cases
-header1 = new TestVersionSelectionHeaders1();
-header2 = new TestVersionSelectionHeaders2();
-testVersionHandler1 = new TestVersionHandler(service, header1);
-testVersionHandler2 = new TestVersionHandler(service, header2);
+header0 = new TestVersionSelectionHeaders0();
+header12 = new TestVersionSelectionHeaders12();
+testVersionHandler0 = new TestVersionHandler(header0);
+testVersionHandler12 = new TestVersionHandler(header12);
 
 // Test exception cases
 badCaseHeader = new TestBadCaseHeaders();
-testHandler = new TestBadCaseHandler(service);
+testHandler = new TestBadCaseHandler();
 
 // Init
 final String address = 
InetAddress.getLoopbackAddress().getHostAddress();
 config = getBaseConfig(getFlinkConfig(address, address, "0"));
 serverEndpoint =
-TestingSqlGatewayRestEndpoint.builder(config, service)
+TestSqlGatewayRestEndpoint.builder(config)
 .withHandler(badCaseHeader, testHandler)
-.withHandler(header1, testVersionHandler1)
-.withHandler(header2, testVersionHandler2)
+.withHandler(header0, testVersionHandler0)
+.withHandler(header12, testVersionHandler12)
 .buildAndStart();
 
-restClient =
-new RestClient(
-config,
-Executors.newFixedThreadPool(
-1, new 
ExecutorThreadFactory("rest-client-thread-pool")));
+restClient = getTestRestClient();
 serverAddress = serverEndpoint.getServerAddress();
 }
 
 @AfterEach
 void stop() throws Exception {
-
 if (restClient != null) {
-restClient.shutdown(timeout);
+restClient.shutdown();
 restClient = null;
 }
 
 if (serverEndpoint != null) {
-serverEndpoint.closeAsync().get(timeout.getSize(), 
timeout.getUnit());
+serverEndpoint.stop();
 serverEndpoint = null;
 }
 }
 
 /** Test that {@link SqlGatewayMessageHeaders} can identify the version 
correctly. */
 @Test
 void testSqlGatewayMessageHeaders() throws Exception {
-// The header only support V1, but send request by V0
+// The header can't support V0, but sends request by V0
 assertThatThrownBy(
 () ->
 restClient.sendRequest(