[GitHub] [flink] luoyuxia commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1070925261


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testAvgAggFunctionPlan() {
+// test explain
+String actualPlan = explainSql("select x, avg(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out"));
+}
+
+@Test
+public void testAvgAggFunction() throws Exception {
+tableEnv.executeSql(
+"create table test_avg(a int, x string, y string, z int, f 
bigint, d decimal(20, 5), e double)");
+tableEnv.executeSql(
+"insert into test_avg values (1, NULL, '2', 1, 2, 
2.22, 2.3), "
++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), "
++ "(2, NULL, '4', 1, 2, 4.55, 4.5), "
++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)")
+.await();
+
+// test avg all element is null
+List result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(x) from 
test_avg").collect());
+assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+// test avg that some string elements can't convert to double
+List result2 =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(y) from 
test_avg").collect());
+assertThat(result2.toString()).isEqualTo("[+I[3.0]]");

Review Comment:
   Sorry for that. Seems there's some issue in my EMR Hive cluster. It may be 
not community compatiable.
   Test with Hive2 & Hive 3 with Hive's own implementation for avg, the result 
should be `[+I[2.0]]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30603) CompactActionITCase in table store is unstable

2023-01-15 Thread Shammon (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677208#comment-17677208
 ] 

Shammon edited comment on FLINK-30603 at 1/16/23 7:45 AM:
--

This test case is still unstable cc [~lzljs3620320]   [~TsReaper]


was (Author: zjureel):
This test case is still unstable cc [~lzljs3620320][~TsReaper]

> CompactActionITCase in table store is unstable
> --
>
> Key: FLINK-30603
> URL: https://issues.apache.org/jira/browse/FLINK-30603
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Shammon
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0, table-store-0.3.1
>
>
> https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149
> Error:  Failures: 
> Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I 
> 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]>
> [INFO] 
> Error:  Tests run: 221, Failures: 1, Errors: 0, Skipped: 4



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30603) CompactActionITCase in table store is unstable

2023-01-15 Thread Shammon (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677208#comment-17677208
 ] 

Shammon edited comment on FLINK-30603 at 1/16/23 7:45 AM:
--

This test case is still unstable cc [~lzljs3620320][~TsReaper]


was (Author: zjureel):
This test case is still unstable

> CompactActionITCase in table store is unstable
> --
>
> Key: FLINK-30603
> URL: https://issues.apache.org/jira/browse/FLINK-30603
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Shammon
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0, table-store-0.3.1
>
>
> https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149
> Error:  Failures: 
> Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I 
> 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]>
> [INFO] 
> Error:  Tests run: 221, Failures: 1, Errors: 0, Skipped: 4



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30603) CompactActionITCase in table store is unstable

2023-01-15 Thread Shammon (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shammon updated FLINK-30603:

Description: 
https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149

Error:  Failures: 
Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I 
1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]>
[INFO] 
Error:  Tests run: 221, Failures: 1, Errors: 0, Skipped: 4

  was:
https://github.com/apache/flink-table-store/actions/runs/3871130631/jobs/6598625030

[INFO] Results:
[INFO] 
Error:  Failures: 
Error:CompactActionITCase.testStreamingCompact:187 expected:<[+I 
1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221209]>



> CompactActionITCase in table store is unstable
> --
>
> Key: FLINK-30603
> URL: https://issues.apache.org/jira/browse/FLINK-30603
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Shammon
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0, table-store-0.3.1
>
>
> https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149
> Error:  Failures: 
> Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I 
> 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]>
> [INFO] 
> Error:  Tests run: 221, Failures: 1, Errors: 0, Skipped: 4



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-30603) CompactActionITCase in table store is unstable

2023-01-15 Thread Shammon (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shammon reopened FLINK-30603:
-

This test case is still unstable

> CompactActionITCase in table store is unstable
> --
>
> Key: FLINK-30603
> URL: https://issues.apache.org/jira/browse/FLINK-30603
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Shammon
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0, table-store-0.3.1
>
>
> https://github.com/apache/flink-table-store/actions/runs/3871130631/jobs/6598625030
> [INFO] Results:
> [INFO] 
> Error:  Failures: 
> Error:CompactActionITCase.testStreamingCompact:187 expected:<[+I 
> 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221209]>



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhuzhurk commented on a diff in pull request #21621: [FLINK-30601][runtime] Omit "setKeyContextElement" call for non-keyed stream/operators to improve performance

2023-01-15 Thread GitBox


zhuzhurk commented on code in PR #21621:
URL: https://github.com/apache/flink/pull/21621#discussion_r107091


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -86,6 +118,71 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 }
 }
 
+private static boolean canOmitSetKeyContext(
+AbstractStreamOperator streamOperator, int input) {
+// Since AbstractStreamOperator is @PublicEvolving, we need to check 
whether the
+// "SetKeyContextElement" is overridden by the (user-implemented) 
subclass. If it is
+// overridden, we cannot omit it due to the subclass may maintain 
different key selectors on
+// its own.
+return !hasKeyContext(streamOperator, input)
+&& !methodSetKeyContextIsOverride(streamOperator, input);
+}
+
+private static boolean hasKeyContext(AbstractStreamOperator operator, 
int input) {
+if (input == 0) {
+return operator.hasKeyContext1();
+} else {
+return operator.hasKeyContext2();
+}
+}
+
+private static boolean methodSetKeyContextIsOverride(
+AbstractStreamOperator operator, int input) {
+if (input == 0) {
+if (operator instanceof OneInputStreamOperator) {
+return methodIsOverride(
+operator,
+OneInputStreamOperator.class,
+METHOD_SET_KEY_CONTEXT_ELEMENT,
+StreamRecord.class)
+|| methodIsOverride(
+operator,
+AbstractStreamOperator.class,
+METHOD_SET_KEY_CONTEXT_ELEMENT1,
+StreamRecord.class);
+} else {
+return methodIsOverride(
+operator,
+AbstractStreamOperator.class,
+METHOD_SET_KEY_CONTEXT_ELEMENT1,
+StreamRecord.class);
+}
+} else {
+return methodIsOverride(
+operator,
+AbstractStreamOperator.class,
+METHOD_SET_KEY_CONTEXT_ELEMENT2,
+StreamRecord.class);
+}
+}
+
+private static boolean methodIsOverride(

Review Comment:
   methodIsOverride -> methodIsOverridden



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -86,6 +118,71 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 }
 }
 
+private static boolean canOmitSetKeyContext(
+AbstractStreamOperator streamOperator, int input) {
+// Since AbstractStreamOperator is @PublicEvolving, we need to check 
whether the
+// "SetKeyContextElement" is overridden by the (user-implemented) 
subclass. If it is
+// overridden, we cannot omit it due to the subclass may maintain 
different key selectors on
+// its own.
+return !hasKeyContext(streamOperator, input)
+&& !methodSetKeyContextIsOverride(streamOperator, input);
+}
+
+private static boolean hasKeyContext(AbstractStreamOperator operator, 
int input) {
+if (input == 0) {
+return operator.hasKeyContext1();
+} else {
+return operator.hasKeyContext2();
+}
+}
+
+private static boolean methodSetKeyContextIsOverride(

Review Comment:
   methodSetKeyContextIsOverride -> methodSetKeyContextElementIsOverridden



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java:
##
@@ -95,6 +100,163 @@ void testGetRecordProcessor2() throws Exception {
 assertThat(operator3.processElement2Called).isTrue();
 }
 
+@Test
+void testOverrideSetKeyContextElementForOneInputStreamOperator() throws 
Exception {
+// test no override
+NoOverrideOneInputStreamOperator noOverride = new 
NoOverrideOneInputStreamOperator();
+RecordProcessorUtils.getRecordProcessor(noOverride).accept(new 
StreamRecord<>("test"));
+assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+// test override "SetKeyContextElement"
+OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext =
+new OverrideSetKeyContextOneInputStreamOperator();
+RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext)
+.accept(new StreamRecord<>("test"));
+assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue();
+
+// test override "SetKeyContextElement1"
+OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 =
+new OverrideSetKeyContext1OneInputStreamOperator();
+

[GitHub] [flink] xintongsong commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


xintongsong commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070900657


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+
+/**
+ * Calculate the buffers specs(the exclusive buffers per channel, min/max 
optional buffers per gate)
+ * based on the configurations, result partition type, and the number of 
channels.
+ *
+ * The threshold is configured by {@link
+ * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE}. 
If the option is not
+ * configured, the threshold for Batch jobs is {@link
+ * InputGateSpecUitls#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for
+ * Streaming jobs is {@link 
InputGateSpecUitls#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM}.
+ */

Review Comment:
   JavaDoc needs to be updated.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateSpecUitls.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils to manage the specs of the {@link InputGate}, for example, {@link 
GateBuffersSpec}. */
+public class InputGateSpecUitls {
+
+public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH = 
1000;
+
+public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM = 
Integer.MAX_VALUE;
+
+public static GateBuffersSpec createGateBuffersSpec(
+Optional configuredMaxRequiredBuffersPerGate,
+int configuredNetworkBuffersPerChannel,
+int configuredFloatingNetworkBuffersPerGate,
+ResultPartitionType partitionType,
+int numInputChannels) {
+int maxRequiredBuffersThresholdPerGate =
+getEffectiveMaxRequiredBuffersPerGate(
+partitionType, configuredMaxRequiredBuffersPerGate);
+int targetRequiredBuffersPerGate =
+getRequiredBuffersTargetPerGate(
+numInputChannels, configuredNetworkBuffersPerChannel);
+int targetTotalBuffersPerGate =
+getTotalBuffersTargetPerGate(
+numInputChannels,
+configuredNetworkBuffersPerChannel,
+configuredFloatingNetworkBuffersPerGate);
+int requiredBuffersPerGate =
+Math.min(maxRequiredBuffersThresholdPerGate, 
targetRequiredBuffersPerGate);
+
+int effectiveExclusiveBuffersPerChannel =
+getExclusiveBuffersPerChannel(
+configuredNetworkBuffersPerChannel,
+numInputChannels,
+requiredBuffersPerGate);
+int effectiveExclusiveBuffersPerGate =
+getEffectiveExclusiveBuffersPerGate(
+numInputChannels, effectiveExclusiveBuffersPerChannel);
+
+int 

[GitHub] [flink] luoyuxia commented on a diff in pull request #21676: [FLINK-30662][table] Planner supports delete

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21676:
URL: https://github.com/apache/flink/pull/21676#discussion_r1070910506


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java:
##
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+
+/** A utility class for delete push down. */
+public class DeletePushDownUtils {
+
+/**
+ * Get the {@link DynamicTableSink} for the table to be modified. Return 
Optional.empty() if it
+ * can't get the {@link DynamicTableSink}.
+ */
+public static Optional getDynamicTableSink(

Review Comment:
   Just, most if from PlannerBase#getTableSink.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java:
##
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import 

[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1070910191


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testAvgAggFunctionPlan() {
+// test explain
+String actualPlan = explainSql("select x, avg(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out"));
+}
+
+@Test
+public void testAvgAggFunction() throws Exception {
+tableEnv.executeSql(
+"create table test_avg(a int, x string, y string, z int, f 
bigint, d decimal(20, 5), e double)");
+tableEnv.executeSql(
+"insert into test_avg values (1, NULL, '2', 1, 2, 
2.22, 2.3), "
++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), "
++ "(2, NULL, '4', 1, 2, 4.55, 4.5), "
++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)")
+.await();
+
+// test avg all element is null
+List result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(x) from 
test_avg").collect());
+assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+// test avg that some string elements can't convert to double
+List result2 =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(y) from 
test_avg").collect());
+assertThat(result2.toString()).isEqualTo("[+I[3.0]]");
+
+// test avg bigint with null element
+List result3 =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(f) from 
test_avg").collect());
+assertThat(result3.toString()).isEqualTo("[+I[2.3335]]");
+
+// test avg decimal
+List result4 =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(d) from 
test_avg").collect());
+assertThat(result4.toString()).isEqualTo("[+I[3.94000]]");

Review Comment:
   The decimal parameter precision is 20, scale is 5, according to hive decimal 
result type infer rule, the result type is decimal(19, 9), the scale plus 4, so 
the test result is corrected. Moreover, I've run this test use hive 
`GenericUDAFAverage`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2023-01-15 Thread GitBox


luoyuxia commented on PR #21257:
URL: https://github.com/apache/flink/pull/21257#issuecomment-1383600766

   @lsyldliu Thanks for contribution. I have addressed your comments.
   For the parallelism of `CompactOperator` in aqe , I have created FLINK-29635 
to track it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30508) CliClientITCase.testSqlStatements failed with output not matched with expected

2023-01-15 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677199#comment-17677199
 ] 

Matthias Pohl commented on FLINK-30508:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44824=logs=de826397-1924-5900-0034-51895f69d4b7=f311e913-93a2-5a37-acab-4a63e1328f94=15722

> CliClientITCase.testSqlStatements failed with output not matched with expected
> --
>
> Key: FLINK-30508
> URL: https://issues.apache.org/jira/browse/FLINK-30508
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Qingsheng Ren
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44246=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=14992



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30640) Unstable test in CliClientITCase

2023-01-15 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl closed FLINK-30640.
-
Resolution: Duplicate

I'm closing this issue in favor of FLINK-30508.

> Unstable test in CliClientITCase
> 
>
> Key: FLINK-30640
> URL: https://issues.apache.org/jira/browse/FLINK-30640
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: yuzelin
>Priority: Major
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44743=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
>  
> The failed test can work normally in my local environment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30695) Support to set parallelism for compact operator according to the number of files in AQE.

2023-01-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-30695:


 Summary: Support to set parallelism for compact operator according 
to the number of files in AQE.
 Key: FLINK-30695
 URL: https://issues.apache.org/jira/browse/FLINK-30695
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1070904577


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testAvgAggFunctionPlan() {
+// test explain
+String actualPlan = explainSql("select x, avg(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out"));
+}
+
+@Test
+public void testAvgAggFunction() throws Exception {
+tableEnv.executeSql(
+"create table test_avg(a int, x string, y string, z int, f 
bigint, d decimal(20, 5), e double)");
+tableEnv.executeSql(
+"insert into test_avg values (1, NULL, '2', 1, 2, 
2.22, 2.3), "
++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), "
++ "(2, NULL, '4', 1, 2, 4.55, 4.5), "
++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)")
+.await();
+
+// test avg all element is null
+List result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(x) from 
test_avg").collect());
+assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+// test avg that some string elements can't convert to double
+List result2 =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(y) from 
test_avg").collect());
+assertThat(result2.toString()).isEqualTo("[+I[3.0]]");
+
+// test avg bigint with null element
+List result3 =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(f) from 
test_avg").collect());
+assertThat(result3.toString()).isEqualTo("[+I[2.3335]]");
+
+// test avg decimal
+List result4 =

Review Comment:
   For the "some string elements can't convert to decimal", the string is 
converted to double type, so here doesn't exist case that string convert to 
decimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1070898438


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testAvgAggFunctionPlan() {
+// test explain
+String actualPlan = explainSql("select x, avg(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out"));
+}
+
+@Test
+public void testAvgAggFunction() throws Exception {
+tableEnv.executeSql(
+"create table test_avg(a int, x string, y string, z int, f 
bigint, d decimal(20, 5), e double)");
+tableEnv.executeSql(
+"insert into test_avg values (1, NULL, '2', 1, 2, 
2.22, 2.3), "
++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), "
++ "(2, NULL, '4', 1, 2, 4.55, 4.5), "
++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)")
+.await();
+
+// test avg all element is null
+List result =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(x) from 
test_avg").collect());
+assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+// test avg that some string elements can't convert to double
+List result2 =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select avg(y) from 
test_avg").collect());
+assertThat(result2.toString()).isEqualTo("[+I[3.0]]");

Review Comment:
   As the following explains, it should be `[+I[3.0]]`, the null element 
doesn't count.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1070897568


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveAverageAggFunction.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import java.math.BigDecimal;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive average aggregate function. */
+public class HiveAverageAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private final UnresolvedReferenceExpression count = unresolvedRef("count");
+private DataType resultType;
+private DataType sumResultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum, count};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getSumResultType(), DataTypes.BIGINT()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ sumInitialValue(), /* count = */ 
literal(0L)};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+// cast the operand to sum needed type
+Expression tryCastOperand = tryCast(operand(0), 
typeLiteral(getSumResultType()));
+return new Expression[] {
+/* sum = */ ifThenElse(isNull(tryCastOperand), sum, 
adjustedPlus(sum, tryCastOperand)),
+/* count = */ ifThenElse(isNull(tryCastOperand), count, 
plus(count, literal(1L))),

Review Comment:
   After deep dive into 
[hive](https://github.com/apache/hive/blob/7b3ecf617a6d46f48a3b6f77e0339fd4ad95a420/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java#L536)
 implementation. If a parameter is null, sum and count both don't do operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries 

[GitHub] [flink] JunRuiLee commented on pull request #21672: [FLINK-30683][runtime] Make adaptive batch scheduler as the default batch scheduler

2023-01-15 Thread GitBox


JunRuiLee commented on PR #21672:
URL: https://github.com/apache/flink/pull/21672#issuecomment-1383573928

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


TanYuxin-tyx commented on PR #21620:
URL: https://github.com/apache/flink/pull/21620#issuecomment-1383572580

   Thanks for helping review the change. I have resolved the conflicts and 
updated the PR. 
   Could you please help review it again? @xintongsong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30516) Support files table in table store

2023-01-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30516:
---
Labels: pull-request-available  (was: )

> Support files table in table store
> --
>
> Key: FLINK-30516
> URL: https://issues.apache.org/jira/browse/FLINK-30516
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.3.0, table-store-0.4.0
>Reporter: Shammon
>Priority: Major
>  Labels: pull-request-available
>
> Add files table in Table Store and user can query row count from 
> `mytable$files`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21257:
URL: https://github.com/apache/flink/pull/21257#discussion_r1070892251


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemCommitter;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.PartitionCommitPolicy;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Committer operator for partition in batch mode. This is the single 
(non-parallel) task. It
+ * collects all the partition information including partition and written 
files send from upstream.
+ */
+public class BatchPartitionCommitterSink extends 
RichSinkFunction {

Review Comment:
   No,  Flink will expect return a `sink`.  So, here we make it extend 
RichSinkFunction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] zjureel opened a new pull request, #484: [FLINK-30516] Introduce files table in table store

2023-01-15 Thread GitBox


zjureel opened a new pull request, #484:
URL: https://github.com/apache/flink-table-store/pull/484

   Introduce files table in Table Store


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070891601


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Calculate the buffers specs(the exclusive buffers per channel, min/max 
optional buffers per gate)
+ * based on the configurations, result partition type, and the number of 
channels.
+ *
+ * The threshold is configured by {@link
+ * 
NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. 
If the option is
+ * not configured, the threshold for Batch jobs is {@link
+ * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for
+ * Streaming jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+ */
+public class GateBuffersSpec {
+
+private final int effectiveExclusiveBuffersPerChannel;
+
+private final int minOptionalBuffers;
+
+private final int maxOptionalBuffers;
+
+private final int maxEffectiveTotalBuffersPerGate;
+
+private GateBuffersSpec(
+int effectiveExclusiveBuffersPerChannel,
+int minOptionalBuffers,
+int maxOptionalBuffers,
+int maxEffectiveTotalBuffersPerGate) {
+this.effectiveExclusiveBuffersPerChannel = 
effectiveExclusiveBuffersPerChannel;
+this.minOptionalBuffers = minOptionalBuffers;
+this.maxOptionalBuffers = maxOptionalBuffers;
+this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate;
+}
+
+int minOptionalBuffers() {
+return minOptionalBuffers;
+}
+
+int maxOptionalBuffers() {
+return maxOptionalBuffers;
+}
+
+int getEffectiveExclusiveBuffersPerChannel() {
+return effectiveExclusiveBuffersPerChannel;
+}
+
+public int maxEffectiveTotalGateBuffers() {
+return maxEffectiveTotalBuffersPerGate;
+}
+
+public static GateBuffersSpec from(
+Optional configuredMaxRequiredBuffersPerGate,
+int configuredNetworkBuffersPerChannel,
+int configuredFloatingNetworkBuffersPerGate,
+ResultPartitionType partitionType,
+int numInputChannels) {
+int maxRequiredBuffersThresholdPerGate =
+getEffectiveMaxRequiredBuffersPerGate(
+partitionType, configuredMaxRequiredBuffersPerGate);
+int minBuffersTargetPerGate =
+getMinBuffersTargetPerGate(numInputChannels, 
configuredNetworkBuffersPerChannel);
+int maxBuffersTargetPerGate =
+getMaxBuffersTargetPerGate(
+numInputChannels,
+configuredNetworkBuffersPerChannel,
+configuredFloatingNetworkBuffersPerGate);
+int minRequiredBuffersPerGate =
+Math.min(maxRequiredBuffersThresholdPerGate, 
minBuffersTargetPerGate);
+
+int effectiveExclusiveBuffersPerChannel =
+adjustExclusiveBuffersPerChannel(
+configuredNetworkBuffersPerChannel,
+numInputChannels,
+minRequiredBuffersPerGate);
+int effectiveExclusiveBuffersPerGate =
+

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070891141


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Calculate the buffers specs(the exclusive buffers per channel, min/max 
optional buffers per gate)
+ * based on the configurations, result partition type, and the number of 
channels.
+ *
+ * The threshold is configured by {@link
+ * 
NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. 
If the option is
+ * not configured, the threshold for Batch jobs is {@link
+ * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for
+ * Streaming jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+ */
+public class GateBuffersSpec {
+
+private final int effectiveExclusiveBuffersPerChannel;
+
+private final int minOptionalBuffers;
+
+private final int maxOptionalBuffers;
+
+private final int maxEffectiveTotalBuffersPerGate;
+
+private GateBuffersSpec(
+int effectiveExclusiveBuffersPerChannel,
+int minOptionalBuffers,
+int maxOptionalBuffers,
+int maxEffectiveTotalBuffersPerGate) {
+this.effectiveExclusiveBuffersPerChannel = 
effectiveExclusiveBuffersPerChannel;
+this.minOptionalBuffers = minOptionalBuffers;
+this.maxOptionalBuffers = maxOptionalBuffers;
+this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate;
+}
+
+int minOptionalBuffers() {
+return minOptionalBuffers;
+}
+
+int maxOptionalBuffers() {
+return maxOptionalBuffers;
+}
+
+int getEffectiveExclusiveBuffersPerChannel() {
+return effectiveExclusiveBuffersPerChannel;
+}
+
+public int maxEffectiveTotalGateBuffers() {
+return maxEffectiveTotalBuffersPerGate;
+}
+
+public static GateBuffersSpec from(
+Optional configuredMaxRequiredBuffersPerGate,
+int configuredNetworkBuffersPerChannel,
+int configuredFloatingNetworkBuffersPerGate,
+ResultPartitionType partitionType,
+int numInputChannels) {
+int maxRequiredBuffersThresholdPerGate =
+getEffectiveMaxRequiredBuffersPerGate(
+partitionType, configuredMaxRequiredBuffersPerGate);
+int minBuffersTargetPerGate =
+getMinBuffersTargetPerGate(numInputChannels, 
configuredNetworkBuffersPerChannel);
+int maxBuffersTargetPerGate =
+getMaxBuffersTargetPerGate(
+numInputChannels,
+configuredNetworkBuffersPerChannel,
+configuredFloatingNetworkBuffersPerGate);
+int minRequiredBuffersPerGate =
+Math.min(maxRequiredBuffersThresholdPerGate, 
minBuffersTargetPerGate);
+
+int effectiveExclusiveBuffersPerChannel =
+adjustExclusiveBuffersPerChannel(
+configuredNetworkBuffersPerChannel,
+numInputChannels,
+minRequiredBuffersPerGate);
+int effectiveExclusiveBuffersPerGate =
+

[GitHub] [flink] X-czh commented on pull request #21673: [FLINK-30513] Cleanup HA storage path on cluster termination

2023-01-15 Thread GitBox


X-czh commented on PR #21673:
URL: https://github.com/apache/flink/pull/21673#issuecomment-1383570619

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070890795


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##
@@ -119,6 +139,82 @@ public static int computeNetworkBuffersForAnnouncing(
 return requirementForInputs + requirementForOutputs;
 }
 
+public static int getEffectiveMaxRequiredBuffersPerGate(
+ResultPartitionType partitionType,
+Optional configuredMaxRequiredBuffersPerGate) {
+return configuredMaxRequiredBuffersPerGate.orElseGet(
+() ->
+isPipelineResultPartition(partitionType)
+? 
DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM
+: 
DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH);
+}
+
+private static int getNumBuffersToAnnounceForInputGate(
+ResultPartitionType type,
+int configuredNetworkBuffersPerChannel,
+int floatingNetworkBuffersPerGate,
+Optional maxRequiredBuffersPerGate,
+int numInputChannels) {
+GateBuffersSpec gateBuffersSpec =
+GateBuffersSpec.from(
+maxRequiredBuffersPerGate,
+configuredNetworkBuffersPerChannel,
+floatingNetworkBuffersPerGate,
+type,
+numInputChannels);
+return gateBuffersSpec.maxEffectiveTotalGateBuffers();
+}
+
+public static boolean isPipelineResultPartition(ResultPartitionType 
partitionType) {
+return partitionType.isPipelinedOrPipelinedBoundedResultPartition()
+|| partitionType == ResultPartitionType.PIPELINED_APPROXIMATE;
+}
+
+/**
+ * Since at least one floating buffer is required, the number of min 
required buffers is reduced
+ * by 1, and then the average number of buffers per channel is calculated. 
Take the minimum
+ * value to ensure that the number of required buffers per gate is not 
more than the given
+ * minRequiredBuffersPerGate.}.
+ */
+public static int adjustExclusiveBuffersPerChannel(
+int configuredNetworkBuffersPerChannel,
+int numInputChannels,
+int minRequiredBuffersPerGate) {
+checkArgument(numInputChannels > 0, "Must be positive.");
+return Math.min(
+configuredNetworkBuffersPerChannel,
+(minRequiredBuffersPerGate - 1) / numInputChannels);

Review Comment:
   Added a check about it.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Calculate the buffers specs(the exclusive buffers per channel, min/max 
optional buffers per gate)
+ * based on the configurations, result partition type, and the number of 
channels.
+ *
+ * The threshold is configured by {@link
+ * 
NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. 
If the option is
+ * not configured, the threshold for Batch jobs is {@link
+ * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for
+ * Streaming jobs is {#link 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070890360


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Calculate the buffers specs(the exclusive buffers per channel, min/max 
optional buffers per gate)
+ * based on the configurations, result partition type, and the number of 
channels.
+ *
+ * The threshold is configured by {@link
+ * 
NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. 
If the option is
+ * not configured, the threshold for Batch jobs is {@link
+ * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for
+ * Streaming jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+ */
+public class GateBuffersSpec {
+
+private final int effectiveExclusiveBuffersPerChannel;
+
+private final int minOptionalBuffers;
+
+private final int maxOptionalBuffers;
+
+private final int maxEffectiveTotalBuffersPerGate;
+
+private GateBuffersSpec(
+int effectiveExclusiveBuffersPerChannel,
+int minOptionalBuffers,
+int maxOptionalBuffers,
+int maxEffectiveTotalBuffersPerGate) {
+this.effectiveExclusiveBuffersPerChannel = 
effectiveExclusiveBuffersPerChannel;
+this.minOptionalBuffers = minOptionalBuffers;
+this.maxOptionalBuffers = maxOptionalBuffers;
+this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate;
+}
+
+int minOptionalBuffers() {
+return minOptionalBuffers;
+}
+
+int maxOptionalBuffers() {
+return maxOptionalBuffers;
+}
+
+int getEffectiveExclusiveBuffersPerChannel() {
+return effectiveExclusiveBuffersPerChannel;
+}
+
+public int maxEffectiveTotalGateBuffers() {
+return maxEffectiveTotalBuffersPerGate;
+}
+
+public static GateBuffersSpec from(
+Optional configuredMaxRequiredBuffersPerGate,
+int configuredNetworkBuffersPerChannel,
+int configuredFloatingNetworkBuffersPerGate,
+ResultPartitionType partitionType,
+int numInputChannels) {
+int maxRequiredBuffersThresholdPerGate =
+getEffectiveMaxRequiredBuffersPerGate(
+partitionType, configuredMaxRequiredBuffersPerGate);
+int minBuffersTargetPerGate =
+getMinBuffersTargetPerGate(numInputChannels, 
configuredNetworkBuffersPerChannel);
+int maxBuffersTargetPerGate =

Review Comment:
   Renamed it.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070890224


##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -225,6 +225,35 @@ public class NettyShuffleEnvironmentOptions {
 + " help relieve back-pressure caused by 
unbalanced data distribution among the subpartitions. This value should be"
 + " increased in case of higher round trip 
times between nodes and/or larger number of machines in the cluster.");
 
+/**
+ * Maximum number of network buffers to use for each outgoing/incoming 
gate (result
+ * partition/input gate), which contains all exclusive network buffers for 
all subpartitions and
+ * all floating buffers for the gate. The exclusive network buffers for 
one channel is
+ * configured by {@link #NETWORK_BUFFERS_PER_CHANNEL} and the floating 
buffers for one gate is
+ * configured by {@link #NETWORK_EXTRA_BUFFERS_PER_GATE}.
+ */
+@Experimental
+@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+public static final ConfigOption 
NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX =

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


TanYuxin-tyx commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070890650


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Calculate the buffers specs(the exclusive buffers per channel, min/max 
optional buffers per gate)
+ * based on the configurations, result partition type, and the number of 
channels.
+ *
+ * The threshold is configured by {@link
+ * 
NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. 
If the option is
+ * not configured, the threshold for Batch jobs is {@link
+ * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for
+ * Streaming jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+ */
+public class GateBuffersSpec {
+
+private final int effectiveExclusiveBuffersPerChannel;
+
+private final int minOptionalBuffers;
+
+private final int maxOptionalBuffers;
+
+private final int maxEffectiveTotalBuffersPerGate;
+
+private GateBuffersSpec(
+int effectiveExclusiveBuffersPerChannel,
+int minOptionalBuffers,
+int maxOptionalBuffers,
+int maxEffectiveTotalBuffersPerGate) {
+this.effectiveExclusiveBuffersPerChannel = 
effectiveExclusiveBuffersPerChannel;
+this.minOptionalBuffers = minOptionalBuffers;
+this.maxOptionalBuffers = maxOptionalBuffers;
+this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate;
+}
+
+int minOptionalBuffers() {
+return minOptionalBuffers;
+}
+
+int maxOptionalBuffers() {
+return maxOptionalBuffers;
+}
+
+int getEffectiveExclusiveBuffersPerChannel() {
+return effectiveExclusiveBuffersPerChannel;
+}
+
+public int maxEffectiveTotalGateBuffers() {
+return maxEffectiveTotalBuffersPerGate;
+}
+
+public static GateBuffersSpec from(
+Optional configuredMaxRequiredBuffersPerGate,
+int configuredNetworkBuffersPerChannel,
+int configuredFloatingNetworkBuffersPerGate,
+ResultPartitionType partitionType,
+int numInputChannels) {
+int maxRequiredBuffersThresholdPerGate =
+getEffectiveMaxRequiredBuffersPerGate(
+partitionType, configuredMaxRequiredBuffersPerGate);
+int minBuffersTargetPerGate =
+getMinBuffersTargetPerGate(numInputChannels, 
configuredNetworkBuffersPerChannel);
+int maxBuffersTargetPerGate =
+getMaxBuffersTargetPerGate(
+numInputChannels,
+configuredNetworkBuffersPerChannel,
+configuredFloatingNetworkBuffersPerGate);
+int minRequiredBuffersPerGate =
+Math.min(maxRequiredBuffersThresholdPerGate, 
minBuffersTargetPerGate);
+
+int effectiveExclusiveBuffersPerChannel =
+adjustExclusiveBuffersPerChannel(

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, 

[GitHub] [flink] JunRuiLee commented on pull request #21672: [FLINK-30683][runtime] Make adaptive batch scheduler as the default batch scheduler

2023-01-15 Thread GitBox


JunRuiLee commented on PR #21672:
URL: https://github.com/apache/flink/pull/21672#issuecomment-1383568933

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21257:
URL: https://github.com/apache/flink/pull/21257#discussion_r1070885603


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##
@@ -76,7 +76,8 @@ public PartitionLoader(
 }
 
 /** Load a single partition. */
-public void loadPartition(LinkedHashMap partSpec, 
List srcDirs)
+public void loadPartition(

Review Comment:
   Yes, it must be dir or not simultaneously. There shouldn't be any case that 
some paths are dir and some are files. If have, it shouldn't be written by 
Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21257:
URL: https://github.com/apache/flink/pull/21257#discussion_r1070884810


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.OutputFormatFactory;
+import org.apache.flink.connector.file.table.PartitionComputer;
+import org.apache.flink.connector.file.table.PartitionTempFileManager;
+import org.apache.flink.connector.file.table.PartitionWriter;
+import org.apache.flink.connector.file.table.PartitionWriterFactory;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableException;
+
+import java.util.LinkedHashMap;
+
+/**
+ * An operator for writing files in batch mode. Once creating a new file to 
write, the writing
+ * operator will emit the written file to downstream.
+ */
+public class BatchFileWriter extends 
AbstractStreamOperator

Review Comment:
   Yes, it aligns the function with `FileSystemOutputFormat`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] thexiay commented on a diff in pull request #21676: [FLINK-30662][table] Planner supports delete

2023-01-15 Thread GitBox


thexiay commented on code in PR #21676:
URL: https://github.com/apache/flink/pull/21676#discussion_r1070884554


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java:
##
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+
+/** A utility class for delete push down. */
+public class DeletePushDownUtils {
+
+/**
+ * Get the {@link DynamicTableSink} for the table to be modified. Return 
Optional.empty() if it
+ * can't get the {@link DynamicTableSink}.
+ */
+public static Optional getDynamicTableSink(

Review Comment:
   Is it copied from PlannerBase#getTableSink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21257:
URL: https://github.com/apache/flink/pull/21257#discussion_r1070883134


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java:
##
@@ -62,4 +62,24 @@ OutputFormat createNewOutputFormat(Path path) throws 
IOException {
 return format;
 }
 }
+
+/** Listener for partition writer. */
+interface PartitionWriterListener {
+/**
+ * Notifies a new file has been created.
+ *
+ * @param partition The partition for the newly created file.
+ * @param file The newly created file.
+ */
+void onFileOpen(String partition, Path file);

Review Comment:
   Actually it should be `onFileOpen`. but I change the doc for the method, 
   ```java
   Note that this does not mean that the file has been created in the file 
system. It is  only created logically and the actual file will be generated 
after it is committed.
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21257:
URL: https://github.com/apache/flink/pull/21257#discussion_r1070883134


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java:
##
@@ -62,4 +62,24 @@ OutputFormat createNewOutputFormat(Path path) throws 
IOException {
 return format;
 }
 }
+
+/** Listener for partition writer. */
+interface PartitionWriterListener {
+/**
+ * Notifies a new file has been created.
+ *
+ * @param partition The partition for the newly created file.
+ * @param file The newly created file.
+ */
+void onFileOpen(String partition, Path file);

Review Comment:
   Actually it should be `onFileOpen`. but I change the doc for the method, 
   ```java
   Note that this does not mean that the file has been created in the file 
system. 
   It is  only created logically and the actual file will be generated after it 
is committed.
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-01-15 Thread Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677179#comment-17677179
 ] 

Liu commented on FLINK-30629:
-

[~xtsong] Have a look at the code? Thanks.

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0
>Reporter: Xintong Song
>Assignee: Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
> Attachments: ClientHeartbeatTestLog.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1070878161


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveAverageAggFunction.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import java.math.BigDecimal;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive average aggregate function. */
+public class HiveAverageAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private final UnresolvedReferenceExpression count = unresolvedRef("count");
+private DataType resultType;
+private DataType sumResultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum, count};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getSumResultType(), DataTypes.BIGINT()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ sumInitialValue(), /* count = */ 
literal(0L)};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+// cast the operand to sum needed type
+Expression tryCastOperand = tryCast(operand(0), 
typeLiteral(getSumResultType()));

Review Comment:
   They are not equivalent, the `PrimitiveObjectInspectorUtils.getDouble` will 
throw `NumberFormatException` when parse failed, but `tryCast` is equal with 
   ```
   try {
  PrimitiveObjectInspectorUtils.getDouble
   } catch(NumberFortmatException e) {
   } 
   ```
   Hive avg udaf will catch the exception. Please see following code in hive 
`GenericUDAFAverage` for more detail
   ```
   try {
 // Skip the same value if avgDistinct is true
 if (isWindowingDistinct()) {
   ObjectInspectorObject obj = new ObjectInspectorObject(
   ObjectInspectorUtils.copyToStandardObject(parameter, 
inputOI, ObjectInspectorCopyOption.JAVA),
   copiedOI);
   if 

[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1070878161


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveAverageAggFunction.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import java.math.BigDecimal;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/** built-in hive average aggregate function. */
+public class HiveAverageAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+private final UnresolvedReferenceExpression count = unresolvedRef("count");
+private DataType resultType;
+private DataType sumResultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {sum, count};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getSumResultType(), DataTypes.BIGINT()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {/* sum = */ sumInitialValue(), /* count = */ 
literal(0L)};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+// cast the operand to sum needed type
+Expression tryCastOperand = tryCast(operand(0), 
typeLiteral(getSumResultType()));

Review Comment:
   They are not equivalent, the `PrimitiveObjectInspectorUtils.getDouble` will 
throw `NumberFormatException` when parse failed, but `tryCast` is equal with 
   ```
   try {
   
   } catch(NumberFortmatException e) {
   } 
   ```
   Hive avg udaf will catch the exception. Please see following code in hive 
`GenericUDAFAverage` for more detail
   ```
   try {
 // Skip the same value if avgDistinct is true
 if (isWindowingDistinct()) {
   ObjectInspectorObject obj = new ObjectInspectorObject(
   ObjectInspectorUtils.copyToStandardObject(parameter, 
inputOI, ObjectInspectorCopyOption.JAVA),
   copiedOI);
   if (averageAggregation.uniqueObjects.contains(obj)) {
   

[jira] [Commented] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced

2023-01-15 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677172#comment-17677172
 ] 

luoyuxia commented on FLINK-30679:
--

[~hehuiyuan] Thanks for contribution. I will have a look.

> Can not load the data of hive dim table when project-push-down is introduced
> 
>
> Key: FLINK-30679
> URL: https://issues.apache.org/jira/browse/FLINK-30679
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.6
>Reporter: hehuiyuan
>Priority: Critical
>  Labels: pull-request-available
>
>  
> Can not load the data of hive dim table when project-push-down is introduced.
> The project push down optimize:[https://github.com/apache/flink/pull/21311]
> hive-exec  version: 2.3.4
> flink version: 1.14.6
> flink-hive-connector: the latest code for release-1.14 branch
>  
> vectorize read:
>  
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: 3
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
>  
>  
> mapreduce read:
>  
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: 3
>     at 
> org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
> ~[?:1.8.0_301]
>     at 
> java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
>  ~[?:1.8.0_301]
>     at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
> ~[?:1.8.0_301]
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_301]
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_301]
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
> ~[?:1.8.0_301]
>     at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>  ~[?:1.8.0_301]
>     at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
> ~[?:1.8.0_301]
>     at 
> org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
>  
>  
> The sql :
>  
> {code:java}
> CREATE TABLE kafkaTableSource (
> name string,
> age int,
> sex string,
> address string,
> ptime AS PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'hehuiyuan1',
> 'scan.startup.mode' = 'latest-offset',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.client.id' = 'test-consumer-group',
> 'properties.group.id' = 'test-consumer-group',
> 'format' = 'csv'
> );
> CREATE TABLE printsink (
> name string,
> age int,
> sex string,
> 

[GitHub] [flink] luoyuxia commented on pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update

2023-01-15 Thread GitBox


luoyuxia commented on PR #21658:
URL: https://github.com/apache/flink/pull/21658#issuecomment-1383550093

   @LadyForest @lincoln-lil Thanks for your reviewing. I have addressed your 
comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update

2023-01-15 Thread GitBox


luoyuxia commented on code in PR #21658:
URL: https://github.com/apache/flink/pull/21658#discussion_r1070871519


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support delete existing data 
according to row-level
+ * changes. The table sink is responsible for telling planner how to produce 
the row changes, and
+ * consuming them to achieve rows delete purpose.
+ *
+ * The planner will call the method {@link 
#applyRowLevelDelete(RowLevelModificationScanContext)}
+ * to get the {@link RowLevelDeleteInfo} returned by sink, and rewrite the 
delete statement based on
+ * the gotten {@link RowLevelDeleteInfo} to produce rows to {@link 
DynamicTableSink}.
+ *
+ * Note: If the sink also implements {@link SupportsDeletePushDown}, the 
planner will always
+ * prefer {@link SupportsDeletePushDown}, and only the filters aren't 
available or {@link
+ * SupportsDeletePushDown#applyDeleteFilters(List)} returns false, this 
interface will be
+ * considered.

Review Comment:
   Thanks. That's more concise and clear..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30257) SqlClientITCase#testMatchRecognize failed

2023-01-15 Thread Lijie Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677166#comment-17677166
 ] 

Lijie Wang commented on FLINK-30257:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44871=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a

> SqlClientITCase#testMatchRecognize failed
> -
>
> Key: FLINK-30257
> URL: https://issues.apache.org/jira/browse/FLINK-30257
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: Martijn Visser
>Priority: Major
>  Labels: test-stability
> Attachments: image-2022-12-29-21-47-31-606.png
>
>
> {code:java}
> Nov 30 21:54:41 [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 224.683 s <<< FAILURE! - in SqlClientITCase
> Nov 30 21:54:41 [ERROR] SqlClientITCase.testMatchRecognize  Time elapsed: 
> 50.164 s  <<< FAILURE!
> Nov 30 21:54:41 org.opentest4j.AssertionFailedError: 
> Nov 30 21:54:41 
> Nov 30 21:54:41 expected: 1
> Nov 30 21:54:41  but was: 0
> Nov 30 21:54:41   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Nov 30 21:54:41   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Nov 30 21:54:41   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Nov 30 21:54:41   at 
> SqlClientITCase.verifyNumberOfResultRecords(SqlClientITCase.java:297)
> Nov 30 21:54:41   at 
> SqlClientITCase.testMatchRecognize(SqlClientITCase.java:255)
> Nov 30 21:54:41   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 30 21:54:41   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 30 21:54:41   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 30 21:54:41   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 30 21:54:41   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMetho
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43635=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=14817



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on pull request #21646: [FLINK-30631][runtime] Limit the max number of subpartitons consumed by each downstream task

2023-01-15 Thread GitBox


wanglijie95 commented on PR #21646:
URL: https://github.com/apache/flink/pull/21646#issuecomment-1383531832

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-28526) Fail to lateral join with UDTF from Table with timstamp column

2023-01-15 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-28526:
---

Assignee: Dian Fu  (was: Xingbo Huang)

> Fail to lateral join with UDTF from Table with timstamp column
> --
>
> Key: FLINK-28526
> URL: https://issues.apache.org/jira/browse/FLINK-28526
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Dian Fu
>Priority: Major
>
> The bug can be reproduced with the following test
> {code:python}
> def test_flink(self):
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> table = t_env.from_descriptor(
> TableDescriptor.for_connector("filesystem")
> .schema(
> Schema.new_builder()
> .column("name", DataTypes.STRING())
> .column("cost", DataTypes.INT())
> .column("distance", DataTypes.INT())
> .column("time", DataTypes.TIMESTAMP(3))
> .watermark("time", "`time` - INTERVAL '60' SECOND")
> .build()
> )
> .format("csv")
> .option("path", "./input.csv")
> .build()
> )
> @udtf(result_types=DataTypes.INT())
> def table_func(row: Row):
> return row.cost + row.distance
> table = table.join_lateral(table_func.alias("cost_times_distance"))
> table.execute().print()
> {code}
> It causes the following exception
> {code:none}
> E   pyflink.util.exceptions.TableException: 
> org.apache.flink.table.api.TableException: Unsupported Python SqlFunction 
> CAST.
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
> E at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79)
> E at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> E at scala.collection.Iterator.foreach(Iterator.scala:937)
> E at scala.collection.Iterator.foreach$(Iterator.scala:937)
> E at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> E at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> E at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> E at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> E at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> E at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> E at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> E at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78)
> E at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
> E at 
> 

[jira] [Updated] (FLINK-30676) Introduce Data Structures for table store

2023-01-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30676:
---
Labels: pull-request-available  (was: )

> Introduce Data Structures for table store
> -
>
> Key: FLINK-30676
> URL: https://issues.apache.org/jira/browse/FLINK-30676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> Copy data structures to table store from Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi opened a new pull request, #483: [FLINK-30676] Introduce Data Structures for table store

2023-01-15 Thread GitBox


JingsongLi opened a new pull request, #483:
URL: https://github.com/apache/flink-table-store/pull/483

   - Introduce new Data Structures
   - Use new Data Types and Data Structures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30651) Move utility methods to CatalogTest and remove CatalogTestUtils class

2023-01-15 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-30651:
---
Summary: Move utility methods to CatalogTest and remove CatalogTestUtils 
class   (was: Move util methods to CatalogTest and remove CatalogTestUtils 
class )

> Move utility methods to CatalogTest and remove CatalogTestUtils class 
> --
>
> Key: FLINK-30651
> URL: https://issues.apache.org/jira/browse/FLINK-30651
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Tests
>Reporter: Samrat Deb
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> [CatalogTestUtils|https://github.com/apache/flink/blame/master/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java#L43]
>  class contains static utilities function. This functions/ methods can be 
> moved to CatalogTest class and make code-flow easier to understand.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21684: [FLINK-30694][docs]Translate /dev/table/sql/queries/window-tvf

2023-01-15 Thread GitBox


flinkbot commented on PR #21684:
URL: https://github.com/apache/flink/pull/21684#issuecomment-1383494024

   
   ## CI report:
   
   * 366aa425f0b85e78796941e5d7c3c3aa28ff7002 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-01-15 Thread chenhaiyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677137#comment-17677137
 ] 

chenhaiyang edited comment on FLINK-30694 at 1/16/23 4:58 AM:
--

Hi [~jark], the PR has been commited, please take a look here: 
[https://github.com/apache/flink/pull/21684|https://github.com/apache/flink/pull/21684]
 
 


was (Author: JIRAUSER298680):
Hi [~jark], the PR has been commited, please take a look here: 
[https://github.com/apache/flink/pull/21682|https://github.com/apache/flink/pull/21682]
 
 

> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.16.0
>Reporter: chenhaiyang
>Assignee: chenhaiyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.1
>
>
> The page url is[ 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] NightRunner opened a new pull request, #21684: [FLINK-30694][docs]Translate /dev/table/sql/queries/window-tvf

2023-01-15 Thread GitBox


NightRunner opened a new pull request, #21684:
URL: https://github.com/apache/flink/pull/21684

   Page "Application Development"/"Table API"/"SQL"/"Queries"/Windows TVF 
translate to chinese.
   
   ## What is the purpose of the change
   
   Translate /dev/table/sql/queries/window-tvf.md
   
   ## Brief change log
   
   Translate /dev/table/sql/queries/window-tvf.md
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] NightRunner closed pull request #21682: [FLINK-30694][docs]Translate /dev/table/sql/queries/window-tvf.md

2023-01-15 Thread GitBox


NightRunner closed pull request #21682: [FLINK-30694][docs]Translate 
/dev/table/sql/queries/window-tvf.md
URL: https://github.com/apache/flink/pull/21682


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #21608: [FLINK-29721][hive] Supports native hive min function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21608:
URL: https://github.com/apache/flink/pull/21608#discussion_r1070834501


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testMinAggFunctionPlan() {

Review Comment:
   Current Hive ITCase covered all the plan related tests, we can improve this 
by another separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] godfreyhe commented on a diff in pull request #21608: [FLINK-29721][hive] Supports native hive min function for hive dialect

2023-01-15 Thread GitBox


godfreyhe commented on code in PR #21608:
URL: https://github.com/apache/flink/pull/21608#discussion_r1070833459


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testMinAggFunctionPlan() {
+// test explain
+String actualPlan = explainSql("select x, min(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMinAggFunctionPlan.out"));
+}
+
+@Test
+public void testMinAggFunction() throws Exception {

Review Comment:
   All types should be covered, including unsupported types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] godfreyhe commented on a diff in pull request #21608: [FLINK-29721][hive] Supports native hive min function for hive dialect

2023-01-15 Thread GitBox


godfreyhe commented on code in PR #21608:
URL: https://github.com/apache/flink/pull/21608#discussion_r1070833142


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testMinAggFunctionPlan() {

Review Comment:
   it's better the plan test should not be in ITCase



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.lessThan;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+
+/** built-in hive min aggregate function. */
+public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction {
+
+private final UnresolvedReferenceExpression min = unresolvedRef("min");
+private DataType resultType;
+
+@Override
+public int operandCount() {
+return 1;
+}
+
+@Override
+public UnresolvedReferenceExpression[] aggBufferAttributes() {
+return new UnresolvedReferenceExpression[] {min};
+}
+
+@Override
+public DataType[] getAggBufferTypes() {
+return new DataType[] {getResultType()};
+}
+
+@Override
+public DataType getResultType() {
+return resultType;
+}
+
+@Override
+public Expression[] initialValuesExpressions() {
+return new Expression[] {
+/* min */
+nullOf(getResultType())
+};
+}
+
+@Override
+public Expression[] accumulateExpressions() {
+return new Expression[] {
+/* min = */ ifThenElse(
+isNull(operand(0)),
+min,
+ifThenElse(
+isNull(min),
+operand(0),
+ifThenElse(lessThan(operand(0), min), operand(0), 
min)))
+};
+}
+
+@Override
+public Expression[] retractExpressions() {
+throw new TableException("Min aggregate function does not support 
retraction.");
+}
+
+@Override
+public Expression[] mergeExpressions() {
+return new Expression[] {
+/* max = */ ifThenElse(

Review Comment:
   min



##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##
@@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception {
 tableEnv.executeSql("drop table test_sum_group");
 }
 
+@Test
+public void testMinAggFunctionPlan() {
+// test explain
+String actualPlan = explainSql("select x, min(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMinAggFunctionPlan.out"));
+}
+
+@Test
+public void testMinAggFunction() throws Exception {

Review Comment:
   all type should be cover, including unsupported types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact 

[jira] [Commented] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-01-15 Thread chenhaiyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677137#comment-17677137
 ] 

chenhaiyang commented on FLINK-30694:
-

Hi [~jark], the PR has been commited, please take a look here: 
[https://github.com/apache/flink/pull/21682|https://github.com/apache/flink/pull/21682]
 
 

> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.16.0
>Reporter: chenhaiyang
>Assignee: chenhaiyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.1
>
>
> The page url is[ 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21683: [FLINK-30672][table] Support 'EXPLAIN PLAN_ADVICE' statement

2023-01-15 Thread GitBox


flinkbot commented on PR #21683:
URL: https://github.com/apache/flink/pull/21683#issuecomment-1383463668

   
   ## CI report:
   
   * e0f770c958d667db2d38d0528b9fc63d75aced96 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30672) Support 'EXPLAIN PLAN_ADVICE' statement

2023-01-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30672:
---
Labels: pull-request-available  (was: )

> Support 'EXPLAIN PLAN_ADVICE' statement
> ---
>
> Key: FLINK-30672
> URL: https://issues.apache.org/jira/browse/FLINK-30672
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Jane Chan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] LadyForest opened a new pull request, #21683: [FLINK-30672][sql-parser] Support 'EXPLAIN PLAN_ADVICE' statement

2023-01-15 Thread GitBox


LadyForest opened a new pull request, #21683:
URL: https://github.com/apache/flink/pull/21683

   ## What is the purpose of the change
   
   This pull request supports the `EXPLAIN PLAN_ADVICE` statement.
   
   
   ## Brief change log
   
   - e0f770c9 supports SQL syntax
   - TODO
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `FlinkSqlParserImplTest` verifies the syntax change.
   - TODO
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? FLINK-30673
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a diff in pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update

2023-01-15 Thread GitBox


lincoln-lil commented on code in PR #21658:
URL: https://github.com/apache/flink/pull/21658#discussion_r1070792730


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import javax.annotation.Nullable;
+
+/**
+ * Interface for {@link ScanTableSource}s that support the row-level 
modification. The table source
+ * is responsible to return the information described by {@link 
RowLevelModificationScanContext}.

Review Comment:
   'responsible for returning'



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import javax.annotation.Nullable;
+
+/**
+ * Interface for {@link ScanTableSource}s that support the row-level 
modification. The table source
+ * is responsible to return the information described by {@link 
RowLevelModificationScanContext}.
+ * The context will be propagated to the sink which implements {@link 
SupportsRowLevelUpdate} or
+ * {@link SupportsRowLevelDelete}.
+ *
+ * Note: This interface is optional for table sources to implement. For 
cases where the table
+ * source neither need to know the type of row-level modification nor 
propagate information to sink,
+ * the table source don't need to implement this interface. See more details 
at {@link

Review Comment:
   'For cases where the table source neither needs to know the type of 
row-level modification nor propagate information to the sink, the table source 
does not need to implement this interface'



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the 

[GitHub] [flink] flinkbot commented on pull request #21682: Translate /dev/table/sql/queries/window-tvf.md

2023-01-15 Thread GitBox


flinkbot commented on PR #21682:
URL: https://github.com/apache/flink/pull/21682#issuecomment-1383452082

   
   ## CI report:
   
   * 1b19856e2279fc2119314ffe23cc8f2dae3432df UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-01-15 Thread chenhaiyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenhaiyang updated FLINK-30694:

Fix Version/s: 1.16.1
Affects Version/s: 1.16.0

> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.16.0
>Reporter: chenhaiyang
>Assignee: chenhaiyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.1
>
>
> The page url is[ 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] NightRunner opened a new pull request, #21682: Translate /dev/table/sql/queries/window-tvf.md

2023-01-15 Thread GitBox


NightRunner opened a new pull request, #21682:
URL: https://github.com/apache/flink/pull/21682

   Page "Application Development"/"Table API"/"SQL"/"Queries"/jions 
translate to chinese.
   
   ## What is the purpose of the change
   
   Translate /dev/table/sql/queries/window-tvf.md
   
   ## Brief change log
   
   Translate /dev/table/sql/queries/window-tvf.md
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #196: [FLINK-30541] Add Transformer and Estimator for OnlineStandardScaler

2023-01-15 Thread GitBox


lindong28 commented on code in PR #196:
URL: https://github.com/apache/flink-ml/pull/196#discussion_r1070769249


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/metrics/MLMetrics.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A collection class for handling metrics in Flink ML.
+ *
+ * All metrics of Flink ML are registered under group "ml", which is a 
child group of {@link
+ * org.apache.flink.metrics.groups.OperatorMetricGroup}. Metrics related to 
model data will be
+ * registered in the group "ml.model".
+ *
+ * For example, the timestamp of the current model data will be reported in 
metric:
+ * "{some_parent_groups}.operator.ml.model.timestamp". And the version of the 
current model data
+ * will be reported in metric: 
"{some_parent_groups}.operator.ml.model.version".
+ */
+@PublicEvolving

Review Comment:
   Since we have not opened FLIP for these metrics, should we mark it 
`@Experimental` for now?



##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasModelVersionCol.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared model version column param. */
+public interface HasModelVersionCol extends WithParams {
+Param MODEL_VERSION_COL =
+new StringParam(
+"modelVersionCol",
+"The version of the model data that the input data is 
predicted with.",

Review Comment:
   Would it be useful to also specify the value type (e.g. long) of this column?



##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasMaxAllowedModelDelayMs.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.LongParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared max allowed model delay in milliseconds param. */
+public interface HasMaxAllowedModelDelayMs extends WithParams {
+Param MAX_ALLOWED_MODEL_DELAY_MS =
+new LongParam(
+"maxAllowedModelDelayMs",
+"The maximum difference between the timestamps of the 
input record and model data when "

Review Comment:
   It seems 

[GitHub] [flink] flinkbot commented on pull request #21681: [FLINK-18397][docs-zh] Translate "Table & SQL Connectors Overview" pa…

2023-01-15 Thread GitBox


flinkbot commented on PR #21681:
URL: https://github.com/apache/flink/pull/21681#issuecomment-1383439136

   
   ## CI report:
   
   * 2c8f06d74c3c48c3ba567e8dc21ce0f1ba3b2e5b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Jrebel-i opened a new pull request, #21681: [FLINK-18397][docs-zh] Translate "Table & SQL Connectors Overview" pa…

2023-01-15 Thread GitBox


Jrebel-i opened a new pull request, #21681:
URL: https://github.com/apache/flink/pull/21681

   
   
   ## What is the purpose of the change
   [FLINK-18397](https://issues.apache.org/jira/browse/FLINK-18397)
   Translate "Table & SQL Connectors Overview" page into Chinese.
   The page url is 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connectors/
   
   ## Brief change log
   The markdown file is located in 
*docs/content.zh/docs/connectors/table/overview.md*
   
   ## Verifying this change
   
   This change is a trivial rework/code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  **no**
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive):  **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:  **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented?   **not documented**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yunfengzhou-hub commented on pull request #21680: [FLINK-30191][python] Update net.sf.py4j:py4j dependency to 0.10.9.7

2023-01-15 Thread GitBox


yunfengzhou-hub commented on PR #21680:
URL: https://github.com/apache/flink/pull/21680#issuecomment-1383431886

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21680: [FLINK-30191][python] Update net.sf.py4j:py4j dependency to 0.10.9.7

2023-01-15 Thread GitBox


flinkbot commented on PR #21680:
URL: https://github.com/apache/flink/pull/21680#issuecomment-1383431236

   
   ## CI report:
   
   * a78b7a8e95f7a0ad8148c14dd94dd495b8229f0a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30130) Table.select lose watermark

2023-01-15 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677130#comment-17677130
 ] 

Jark Wu commented on FLINK-30130:
-

Sorry [~yunfengzhou] [~kamalesh0420], I don't think this is a problem or bug. 
This is by design that watermark **definition** can't be propagated as the 
referenced columns might not be selected in the current view/table. But the 
printed schema shows the "ROWTIME" attribute, which users can use to do 
windowing. You can also use "TUMBLE_ROWTIME" to generate a new rowtime 
attribute (and watermark implicitly) for the next rowtime-based operations. 
Therefore, from the perspective of users, there is enough information in the 
printed schema, and this doesn't affect users using rowtime-based operations. 

Please reopen the issue if you still have questions. 

> Table.select lose watermark
> ---
>
> Key: FLINK-30130
> URL: https://issues.apache.org/jira/browse/FLINK-30130
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Yunfeng Zhou
>Priority: Major
>
> Trying to execute the following program
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> DataStream stream = env.fromSequence(0, 1000);
> Schema schema = Schema.newBuilder()
> .column("f0", DataTypes.BIGINT())
> .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(f0 * 1000, 3)")
> .watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND")
> .build();
> Table table = tEnv.fromDataStream(stream, schema);
> table.printSchema();
> table = table.select($("*"));
> table.printSchema();{code}
> Would get the following result
> {code:java}
> (
>   `f0` BIGINT,
>   `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* AS TO_TIMESTAMP_LTZ(f0 * 1000, 3),
>   WATERMARK FOR `time_ltz`: TIMESTAMP_LTZ(3) AS time_ltz - INTERVAL '5' SECOND
> )
> (
>   `f0` BIGINT,
>   `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME*
> )
> {code}
> This result shows that the watermark property of a Table is lost during 
> select operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30130) Table.select lose watermark

2023-01-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-30130.
---
Resolution: Won't Fix

> Table.select lose watermark
> ---
>
> Key: FLINK-30130
> URL: https://issues.apache.org/jira/browse/FLINK-30130
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Yunfeng Zhou
>Priority: Major
>
> Trying to execute the following program
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> DataStream stream = env.fromSequence(0, 1000);
> Schema schema = Schema.newBuilder()
> .column("f0", DataTypes.BIGINT())
> .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(f0 * 1000, 3)")
> .watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND")
> .build();
> Table table = tEnv.fromDataStream(stream, schema);
> table.printSchema();
> table = table.select($("*"));
> table.printSchema();{code}
> Would get the following result
> {code:java}
> (
>   `f0` BIGINT,
>   `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* AS TO_TIMESTAMP_LTZ(f0 * 1000, 3),
>   WATERMARK FOR `time_ltz`: TIMESTAMP_LTZ(3) AS time_ltz - INTERVAL '5' SECOND
> )
> (
>   `f0` BIGINT,
>   `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME*
> )
> {code}
> This result shows that the watermark property of a Table is lost during 
> select operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30191) Update py4j from 0.10.9.3 to 0.10.9.7

2023-01-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30191:
---
Labels: pull-request-available  (was: )

> Update py4j from 0.10.9.3 to 0.10.9.7
> -
>
> Key: FLINK-30191
> URL: https://issues.apache.org/jira/browse/FLINK-30191
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Python
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21679: [FLINK-30561][state/changelog] fix changelog local cache file not found

2023-01-15 Thread GitBox


flinkbot commented on PR #21679:
URL: https://github.com/apache/flink/pull/21679#issuecomment-1383427385

   
   ## CI report:
   
   * 7cd37c90646b3713f4e734741b9074223c762932 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yunfengzhou-hub opened a new pull request, #21680: [FLINK-30191][python] Update net.sf.py4j:py4j dependency to 0.10.9.7

2023-01-15 Thread GitBox


yunfengzhou-hub opened a new pull request, #21680:
URL: https://github.com/apache/flink/pull/21680

   ## What is the purpose of the change
   
   - Updating net.sf.py4j:py4j from 0.10.9.3 to 0.10.9.7
   
   ## Brief change log
   
   - Updated POM, Python dev requirements & setup, NOTICE file
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): yes
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
   - The serializers: no
   - The runtime per-record code paths (performance sensitive): no
   - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
   - The S3 file system connector: no
   
   ## Documentation
   
   - Does this pull request introduce a new feature? no
   - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager

2023-01-15 Thread GitBox


xintongsong commented on code in PR #21620:
URL: https://github.com/apache/flink/pull/21620#discussion_r1070766952


##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -225,6 +225,35 @@ public class NettyShuffleEnvironmentOptions {
 + " help relieve back-pressure caused by 
unbalanced data distribution among the subpartitions. This value should be"
 + " increased in case of higher round trip 
times between nodes and/or larger number of machines in the cluster.");
 
+/**
+ * Maximum number of network buffers to use for each outgoing/incoming 
gate (result
+ * partition/input gate), which contains all exclusive network buffers for 
all subpartitions and
+ * all floating buffers for the gate. The exclusive network buffers for 
one channel is
+ * configured by {@link #NETWORK_BUFFERS_PER_CHANNEL} and the floating 
buffers for one gate is
+ * configured by {@link #NETWORK_EXTRA_BUFFERS_PER_GATE}.
+ */
+@Experimental
+@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+public static final ConfigOption 
NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX =

Review Comment:
   ```suggestion
   public static final ConfigOption 
NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX =
   ```
   ```suggestion
   public static final ConfigOption 
NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE =
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate;
+import static 
org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Calculate the buffers specs(the exclusive buffers per channel, min/max 
optional buffers per gate)
+ * based on the configurations, result partition type, and the number of 
channels.
+ *
+ * The threshold is configured by {@link
+ * 
NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. 
If the option is
+ * not configured, the threshold for Batch jobs is {@link
+ * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the 
threshold for
+ * Streaming jobs is {#link 
NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}.
+ */
+public class GateBuffersSpec {
+
+private final int effectiveExclusiveBuffersPerChannel;
+
+private final int minOptionalBuffers;
+
+private final int maxOptionalBuffers;
+
+private final int maxEffectiveTotalBuffersPerGate;
+
+private GateBuffersSpec(
+int effectiveExclusiveBuffersPerChannel,
+int minOptionalBuffers,
+int maxOptionalBuffers,
+int maxEffectiveTotalBuffersPerGate) {
+this.effectiveExclusiveBuffersPerChannel = 
effectiveExclusiveBuffersPerChannel;
+this.minOptionalBuffers = minOptionalBuffers;
+this.maxOptionalBuffers = maxOptionalBuffers;
+this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate;
+}
+
+int minOptionalBuffers() {
+return minOptionalBuffers;
+}
+
+int maxOptionalBuffers() {
+return maxOptionalBuffers;
+}
+
+int getEffectiveExclusiveBuffersPerChannel() {
+return effectiveExclusiveBuffersPerChannel;
+}
+
+public int 

[jira] [Updated] (FLINK-30561) ChangelogStreamHandleReaderWithCache cause FileNotFoundException

2023-01-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30561:
---
Labels: pull-request-available  (was: )

> ChangelogStreamHandleReaderWithCache cause FileNotFoundException
> 
>
> Key: FLINK-30561
> URL: https://issues.apache.org/jira/browse/FLINK-30561
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> When a job with state changelog enabled continues to restart, the following 
> exceptions may occur :
> {code:java}
> java.lang.RuntimeException: java.io.FileNotFoundException: 
> /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_192/dstl-cache-file/dstl6215344559415829831.tmp
>  (No such file or directory)
>     at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>     at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>     at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>     at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107)
>     at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94)
>     at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: 
> /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_192/dstl-cache-file/dstl6215344559415829831.tmp
>  (No such file or directory)
>     at java.io.FileInputStream.open0(Native Method)
>     at java.io.FileInputStream.open(FileInputStream.java:195)
>     at java.io.FileInputStream.init(FileInputStream.java:138)
>     at 
> org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158)
>     at 
> org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95)
>     at 
> org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42)
>     at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
>     ... 21 more {code}
> *Problem causes:*
>  # *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local 
> cache file. The reference count is incremented when the input stream is 
> opened from the cache file, and decremented by one when the input stream is 
> closed. So the input stream must 

[GitHub] [flink] zoltar9264 opened a new pull request, #21679: [FLINK-30561][state/changelog] fix changelog local cache file not found

2023-01-15 Thread GitBox


zoltar9264 opened a new pull request, #21679:
URL: https://github.com/apache/flink/pull/21679

   ## What is the purpose of the change
Fix the changelog local cache file not found while restarting, described in 
[FLINK-30561](https://issues.apache.org/jira/browse/FLINK-30561).
   
   
   ## Brief change log
   
   *(for example:)*
 - prevent  _StateChangelogHandleStreamHandleReader_  close current 
iterator twice.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-01-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-30694:
---

Assignee: chenhaiyang  (was: zhule)

> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: chenhaiyang
>Assignee: chenhaiyang
>Priority: Major
>  Labels: pull-request-available
>
> The page url is[ 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] LadyForest commented on a diff in pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update

2023-01-15 Thread GitBox


LadyForest commented on code in PR #21658:
URL: https://github.com/apache/flink/pull/21658#discussion_r1070788300


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support delete existing data 
according to row-level
+ * changes. The table sink is responsible for telling planner how to produce 
the row changes, and
+ * consuming them to achieve rows delete purpose.
+ *
+ * The planner will call the method {@link 
#applyRowLevelDelete(RowLevelModificationScanContext)}
+ * to get the {@link RowLevelDeleteInfo} returned by sink, and rewrite the 
delete statement based on
+ * the gotten {@link RowLevelDeleteInfo} to produce rows to {@link 
DynamicTableSink}.
+ *
+ * Note: If the sink also implements {@link SupportsDeletePushDown}, the 
planner will always
+ * prefer {@link SupportsDeletePushDown}, and only the filters aren't 
available or {@link
+ * SupportsDeletePushDown#applyDeleteFilters(List)} returns false, this 
interface will be
+ * considered.

Review Comment:
   This statement is slightly confusing because the sentence's subject is the 
**planner**. How could a planner consider an interface? So what about
   
   ```java
   The planner always prefers {@link SupportsDeletePushDown} 
   over {@link SupportsRowLevelDelete} on condition that
{@link SupportsDeletePushDown#applyDeleteFilters(List)} returns true.
   ```
   From my perspective, this is relatively concise and clear. If you worry it 
is not detailed enough, we can add the following description at the end.
   ```java
   The sink needs to implement this interface iff {@link 
SupportsDeletePushDown#applyDeleteFilters(List)} returns false.
   ```
   



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support delete existing data 
according to row-level
+ * changes. The table sink is responsible for telling planner how to produce 
the row changes, and
+ * consuming them to achieve rows delete purpose.
+ *
+ * The planner will call the method {@link 
#applyRowLevelDelete(RowLevelModificationScanContext)}
+ * to get the {@link RowLevelDeleteInfo} 

[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2023-01-15 Thread GitBox


vacaly commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1070782561


##
flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.recommendation.swing.Swing;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link Swing}. */
+public class SwingTest {
+@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+private StreamExecutionEnvironment env;
+private StreamTableEnvironment tEnv;

Review Comment:
   `env, tenv and trainData` are global variables and accessed by several test 
functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-30689) Use auto produce bytes schema in Pulsar sink

2023-01-15 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-30689:
--

Assignee: Yufan Sheng

> Use auto produce bytes schema in Pulsar sink
> 
>
> Key: FLINK-30689
> URL: https://issues.apache.org/jira/browse/FLINK-30689
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.0.0
>Reporter: Yufan Sheng
>Assignee: Yufan Sheng
>Priority: Major
> Fix For: pulsar-4.0.0
>
>
> Pulsar has a {{Schema.AUTO_PRODUCE_BYTES()}} for sending the byte array 
> messages to broker with extra schema validation. This would be better than 
> directly using {{Schema.BYTES}} which bypass the validation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30687) FILTER not effect in count(*)

2023-01-15 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122
 ] 

tanjialiang edited comment on FLINK-30687 at 1/16/23 2:58 AM:
--

I try to found out how to fix this, i found in 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression
 the filterArg must be >0, but ‘SELECT COUNT( * ) FILTER (WHERE cluster_id = 1) 
FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', 
and it work.

!image-2023-01-16-10-54-04-673.png!

 

So i want to ask why we can not filterArg=0? Did it some scenes we can not use? 
Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 
0.


was (Author: JIRAUSER279823):
I try to found out how to fix this, i found in 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression
 the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) 
FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', 
and it work.

!image-2023-01-16-10-54-04-673.png!

 

So i want to ask why we can not filterArg=0? Did it some scenes we can not use? 
Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 
0.

> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
> this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30687) FILTER not effect in count(*)

2023-01-15 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122
 ] 

tanjialiang commented on FLINK-30687:
-

I try to found out how to fix this, i found in 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression
 the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) 
FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', 
and it work.

!image-2023-01-16-10-54-04-673.png!

 

So i want to ask why we can not filterArg=0? Did it some scenes we can not use? 
Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 
0.

> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
> this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu commented on pull request #21605: [FLINK-29722][hive] Supports native hive max function for hive dialect

2023-01-15 Thread GitBox


lsyldliu commented on PR #21605:
URL: https://github.com/apache/flink/pull/21605#issuecomment-1383400146

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2023-01-15 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30687:

Attachment: image-2023-01-16-10-54-04-673.png

> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
> this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu commented on a diff in pull request #21629: [FLINK-30579][hive] Introducing cofigurable option to enable hive native function

2023-01-15 Thread GitBox


lsyldliu commented on code in PR #21629:
URL: https://github.com/apache/flink/pull/21629#discussion_r1070778492


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##
@@ -310,7 +311,11 @@ public TOpenSessionResp OpenSession(TOpenSessionReq 
tOpenSessionReq) throws TExc
 // all the alive PersistenceManager in the ObjectStore, which may 
get error like
 // "Persistence Manager has been closed" in the later connection.
 hiveCatalog.open();
-Module hiveModule = new HiveModule();
+// create hive module lazily
+FunctionCreator hiveModuleCreator =
+(options, readableConfig, classLoader) ->
+FactoryUtil.createModule(

Review Comment:
   It uses the classloader in Context, and so is user classloader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-01-15 Thread chenhaiyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677119#comment-17677119
 ] 

chenhaiyang commented on FLINK-30694:
-

Hi [~jark] ,Can you assign this issue to me?
I'd like to working on this ticket.
Thank you very much!

> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: chenhaiyang
>Assignee: zhule
>Priority: Major
>  Labels: pull-request-available
>
> The page url is[ 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-01-15 Thread chenhaiyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenhaiyang updated FLINK-30694:

Description: 
The page url is[ 
[https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]

 

The markdown file is located in 
docs/content.zh/docs/dev/table/sql/queries/window-tvf.md

  was:
The page url is[ 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]

 

The markdown file is located in 
docs/content.zh/docs/dev/table/sql/queries/window-tvf.md


> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: chenhaiyang
>Assignee: zhule
>Priority: Major
>  Labels: pull-request-available
>
> The page url is[ 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-01-15 Thread chenhaiyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenhaiyang updated FLINK-30694:

Fix Version/s: (was: 1.11.0)
  Description: 
The page url is[ 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]

 

The markdown file is located in 
docs/content.zh/docs/dev/table/sql/queries/window-tvf.md

  was:
The page url is 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/functions/

The markdown file is located in {{flink/docs/dev/table/functions/index.zh.md}}

  Summary: Translate "Windowing TVF" page of "Querys" into Chinese   
(was: Translate "Joins" page of "Querys" into Chinese )

> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: chenhaiyang
>Assignee: zhule
>Priority: Major
>  Labels: pull-request-available
>
> The page url is[ 
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30694) Translate "Joins" page of "Querys" into Chinese

2023-01-15 Thread chenhaiyang (Jira)
chenhaiyang created FLINK-30694:
---

 Summary: Translate "Joins" page of "Querys" into Chinese 
 Key: FLINK-30694
 URL: https://issues.apache.org/jira/browse/FLINK-30694
 Project: Flink
  Issue Type: Sub-task
  Components: chinese-translation, Documentation
Reporter: chenhaiyang
Assignee: zhule
 Fix For: 1.11.0


The page url is 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/functions/

The markdown file is located in {{flink/docs/dev/table/functions/index.zh.md}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30332) HsFileDataIndex should supports spilling to file

2023-01-15 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-30332.

Fix Version/s: 1.17.0
   Resolution: Done

master (1.17): 8e50e24797fbb154cdf7c484775f0fafa94cf34c

> HsFileDataIndex should supports spilling to file
> 
>
> Key: FLINK-30332
> URL: https://issues.apache.org/jira/browse/FLINK-30332
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> `HsFileDataIndex` is maintains in a list of treeMap(i.e. in memory), there is 
> no problem in our 10T TPC-DS test. However, with the increase of job's data 
> volume, the index will continue to grow, and there is a risk of OOM. We 
> should support index data spilling and loading to avoid this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #21603: [FLINK-30332] HsFileDataIndex supports caching and spilling index entry

2023-01-15 Thread GitBox


xintongsong closed pull request #21603: [FLINK-30332] HsFileDataIndex supports 
caching and spilling index entry
URL: https://github.com/apache/flink/pull/21603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-01-15 Thread GitBox


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1383361255

   @gaoyunhaii @dawidwys 
   Could you help to take a review ? 
   I'd like to make it usable in 1.17. Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30613) Improve resolving schema compatibility -- Milestone one

2023-01-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30613:
---
Labels: pull-request-available  (was: )

> Improve resolving schema compatibility -- Milestone one
> ---
>
> Key: FLINK-30613
> URL: https://issues.apache.org/jira/browse/FLINK-30613
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> In the milestone one, we should:
>  # Add an extra method 
> (TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot 
> oldSerializerSnapshot)) in TypeSerializerSnapshot.java as above, and return 
> INCOMPATIBLE as default.
>  # Mark the original method as deprecated and it will use new method to 
> resolve as default.
>  # Implement the new method for all built-in TypeserializerSnapshots.
> See FLIP-263 for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx commented on pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-01-15 Thread GitBox


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1383357691

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myracle commented on pull request #21661: [FLINK-30629][Client/Job Submission] Fix the unstable test ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat

2023-01-15 Thread GitBox


Myracle commented on PR #21661:
URL: https://github.com/apache/flink/pull/21661#issuecomment-1383357574

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 merged pull request #200: [FLINK-30578][build] Publish SBOM artifacts

2023-01-15 Thread GitBox


lindong28 merged PR #200:
URL: https://github.com/apache/flink-ml/pull/200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30168) PyFlink Deserialization Error with Object Array

2023-01-15 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-30168.
---
Fix Version/s: 1.17.0
   1.16.1
   1.15.4
 Assignee: Dian Fu  (was: Xingbo Huang)
   Resolution: Fixed

Fixed in:
- master via 46757739cf50c1e7b7305a4bc9cf779bb1945a1f
- release-1.16 via 46c91ed4bc22e2de3a662d52b11ade8ed64dba0b
- release-1.15 via cdecc21cad9f78b1555a0e2f5d7f1398949e7193

> PyFlink Deserialization Error with Object Array
> ---
>
> Key: FLINK-30168
> URL: https://issues.apache.org/jira/browse/FLINK-30168
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Yunfeng Zhou
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
>
> When it is attempted to collect object array records from a DataStream in 
> PyFlink, an exception like follows would be thrown
> {code:java}
> data = 0, field_type = DenseVectorTypeInfo
> def pickled_bytes_to_python_converter(data, field_type):if 
> isinstance(field_type, RowTypeInfo):
> row_kind = RowKind(int.from_bytes(data[0], 'little'))
> data = zip(list(data[1:]), field_type.get_field_types())
> fields = []for d, d_type in data:
> fields.append(pickled_bytes_to_python_converter(d, d_type))
> row = Row.of_kind(row_kind, *fields)return rowelse:
> > data = pickle.loads(data)
> E TypeError: a bytes-like object is required, not 'int'{code}
> I found that this error is invoked because PyFlink deals with object arrays 
> differently on Java side and Python side. 
>  
> On Java side 
> (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)
> {code:java}
> ...
> else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof 
> PrimitiveArrayTypeInfo) {
> # recursively deal with array elements
> } ...
> else {
> # ObjectArrayTypeInfo is here
> TypeSerializer serializer = dataType.createSerializer(null); 
> ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); 
> DataOutputViewStreamWrapper baosWrapper = new 
> DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); 
> return pickler.dumps(baos.toByteArray());
> }
> {code}
>  
> On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
> {code:java}
> ...
> elif isinstance(field_type,
> (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
>   element_type = field_type._element_type
>   elements = []
>   for element_bytes in data:
> elements.append(pickled_bytes_to_python_converter(element_bytes, 
> element_type))
>   return elements{code}
>  
> Thus a possible fix for this bug is to align PyFlink's behavior on Java side 
> and Python side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 commented on pull request #200: [FLINK-30578][build] Publish SBOM artifacts

2023-01-15 Thread GitBox


lindong28 commented on PR #200:
URL: https://github.com/apache/flink-ml/pull/200#issuecomment-1383342110

   Thanks for the PR. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dianfu closed pull request #21664: [FLINK-30168][python] Fix DataStream.execute_and_collect to support None data and ObjectArray

2023-01-15 Thread GitBox


dianfu closed pull request #21664: [FLINK-30168][python] Fix 
DataStream.execute_and_collect to support None data and ObjectArray
URL: https://github.com/apache/flink/pull/21664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29231) PyFlink UDAF produces different results in the same sliding window

2023-01-15 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-29231.
---
Fix Version/s: 1.17.0
   1.16.1
   1.15.4
   Resolution: Fixed

Fixed in:
- master via addca4e18aa1806984070b8c450d3d0df5e473d3
- release-1.16 via 6d3d66ca71a87c50f9d160b5c05aa6cbe49ae1b0
- release-1.15 via d9115d8c2aae75c0b17471c1ab8a0769e606def1

> PyFlink UDAF produces different results in the same sliding window
> --
>
> Key: FLINK-29231
> URL: https://issues.apache.org/jira/browse/FLINK-29231
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.3
>Reporter: Xuannan Su
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1, 1.15.4
>
> Attachments: image-2022-09-08-17-20-06-296.png, input, test_agg.py
>
>
> It seems that PyFlink udtaf produces different results in the same sliding 
> window. It can be reproduced with the given code and input. It is not always 
> happening but the possibility is relatively high.
> The incorrect output is the following:
> !image-2022-09-08-17-20-06-296.png!
>  
> We can see that the output contains different `val_sum` at `window_time` 
> 2022-01-01 00:01:59.999.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dianfu closed pull request #21655: [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window

2023-01-15 Thread GitBox


dianfu closed pull request #21655: [FLINK-29231][python] Fix the issue that 
Python UDAF may produce multiple results for the same window
URL: https://github.com/apache/flink/pull/21655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced

2023-01-15 Thread hehuiyuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677107#comment-17677107
 ] 

hehuiyuan commented on FLINK-30679:
---

[~luoyuxia] ,hi

Take a look when you have time

> Can not load the data of hive dim table when project-push-down is introduced
> 
>
> Key: FLINK-30679
> URL: https://issues.apache.org/jira/browse/FLINK-30679
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.6
>Reporter: hehuiyuan
>Priority: Critical
>  Labels: pull-request-available
>
>  
> Can not load the data of hive dim table when project-push-down is introduced.
> The project push down optimize:[https://github.com/apache/flink/pull/21311]
> hive-exec  version: 2.3.4
> flink version: 1.14.6
> flink-hive-connector: the latest code for release-1.14 branch
>  
> vectorize read:
>  
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: 3
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
>  
>  
> mapreduce read:
>  
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: 3
>     at 
> org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
> ~[?:1.8.0_301]
>     at 
> java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
>  ~[?:1.8.0_301]
>     at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
> ~[?:1.8.0_301]
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_301]
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_301]
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
> ~[?:1.8.0_301]
>     at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>  ~[?:1.8.0_301]
>     at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
> ~[?:1.8.0_301]
>     at 
> org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
>  ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
>  ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
>  
>  
> The sql :
>  
> {code:java}
> CREATE TABLE kafkaTableSource (
> name string,
> age int,
> sex string,
> address string,
> ptime AS PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'hehuiyuan1',
> 'scan.startup.mode' = 'latest-offset',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.client.id' = 'test-consumer-group',
> 'properties.group.id' = 'test-consumer-group',
> 'format' = 'csv'
> );
> CREATE TABLE printsink (
> name string,
> age int,
> sex string,
> address 

  1   2   >