[GitHub] [flink] luoyuxia commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect
luoyuxia commented on code in PR #21601: URL: https://github.com/apache/flink/pull/21601#discussion_r1070925261 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testAvgAggFunctionPlan() { +// test explain +String actualPlan = explainSql("select x, avg(y) from foo group by x"); + assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out")); +} + +@Test +public void testAvgAggFunction() throws Exception { +tableEnv.executeSql( +"create table test_avg(a int, x string, y string, z int, f bigint, d decimal(20, 5), e double)"); +tableEnv.executeSql( +"insert into test_avg values (1, NULL, '2', 1, 2, 2.22, 2.3), " ++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), " ++ "(2, NULL, '4', 1, 2, 4.55, 4.5), " ++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)") +.await(); + +// test avg all element is null +List result = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(x) from test_avg").collect()); +assertThat(result.toString()).isEqualTo("[+I[null]]"); + +// test avg that some string elements can't convert to double +List result2 = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(y) from test_avg").collect()); +assertThat(result2.toString()).isEqualTo("[+I[3.0]]"); Review Comment: Sorry for that. Seems there's some issue in my EMR Hive cluster. It may be not community compatiable. Test with Hive2 & Hive 3 with Hive's own implementation for avg, the result should be `[+I[2.0]]`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30603) CompactActionITCase in table store is unstable
[ https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677208#comment-17677208 ] Shammon edited comment on FLINK-30603 at 1/16/23 7:45 AM: -- This test case is still unstable cc [~lzljs3620320] [~TsReaper] was (Author: zjureel): This test case is still unstable cc [~lzljs3620320][~TsReaper] > CompactActionITCase in table store is unstable > -- > > Key: FLINK-30603 > URL: https://issues.apache.org/jira/browse/FLINK-30603 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Shammon >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0, table-store-0.3.1 > > > https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149 > Error: Failures: > Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I > 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]> > [INFO] > Error: Tests run: 221, Failures: 1, Errors: 0, Skipped: 4 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30603) CompactActionITCase in table store is unstable
[ https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677208#comment-17677208 ] Shammon edited comment on FLINK-30603 at 1/16/23 7:45 AM: -- This test case is still unstable cc [~lzljs3620320][~TsReaper] was (Author: zjureel): This test case is still unstable > CompactActionITCase in table store is unstable > -- > > Key: FLINK-30603 > URL: https://issues.apache.org/jira/browse/FLINK-30603 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Shammon >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0, table-store-0.3.1 > > > https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149 > Error: Failures: > Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I > 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]> > [INFO] > Error: Tests run: 221, Failures: 1, Errors: 0, Skipped: 4 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30603) CompactActionITCase in table store is unstable
[ https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shammon updated FLINK-30603: Description: https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149 Error: Failures: Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]> [INFO] Error: Tests run: 221, Failures: 1, Errors: 0, Skipped: 4 was: https://github.com/apache/flink-table-store/actions/runs/3871130631/jobs/6598625030 [INFO] Results: [INFO] Error: Failures: Error:CompactActionITCase.testStreamingCompact:187 expected:<[+I 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221209]> > CompactActionITCase in table store is unstable > -- > > Key: FLINK-30603 > URL: https://issues.apache.org/jira/browse/FLINK-30603 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Shammon >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0, table-store-0.3.1 > > > https://github.com/apache/flink-table-store/actions/runs/3927960511/jobs/6715071149 > Error: Failures: > Error:CompactActionITCase.testStreamingCompact:193 expected:<[+I > 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221208]> > [INFO] > Error: Tests run: 221, Failures: 1, Errors: 0, Skipped: 4 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-30603) CompactActionITCase in table store is unstable
[ https://issues.apache.org/jira/browse/FLINK-30603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shammon reopened FLINK-30603: - This test case is still unstable > CompactActionITCase in table store is unstable > -- > > Key: FLINK-30603 > URL: https://issues.apache.org/jira/browse/FLINK-30603 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Shammon >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0, table-store-0.3.1 > > > https://github.com/apache/flink-table-store/actions/runs/3871130631/jobs/6598625030 > [INFO] Results: > [INFO] > Error: Failures: > Error:CompactActionITCase.testStreamingCompact:187 expected:<[+I > 1|100|15|20221208, +I 1|100|15|20221209]> but was:<[+I 1|100|15|20221209]> -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zhuzhurk commented on a diff in pull request #21621: [FLINK-30601][runtime] Omit "setKeyContextElement" call for non-keyed stream/operators to improve performance
zhuzhurk commented on code in PR #21621: URL: https://github.com/apache/flink/pull/21621#discussion_r107091 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -86,6 +118,71 @@ public static ThrowingConsumer, Exception> getRecordProcesso } } +private static boolean canOmitSetKeyContext( +AbstractStreamOperator streamOperator, int input) { +// Since AbstractStreamOperator is @PublicEvolving, we need to check whether the +// "SetKeyContextElement" is overridden by the (user-implemented) subclass. If it is +// overridden, we cannot omit it due to the subclass may maintain different key selectors on +// its own. +return !hasKeyContext(streamOperator, input) +&& !methodSetKeyContextIsOverride(streamOperator, input); +} + +private static boolean hasKeyContext(AbstractStreamOperator operator, int input) { +if (input == 0) { +return operator.hasKeyContext1(); +} else { +return operator.hasKeyContext2(); +} +} + +private static boolean methodSetKeyContextIsOverride( +AbstractStreamOperator operator, int input) { +if (input == 0) { +if (operator instanceof OneInputStreamOperator) { +return methodIsOverride( +operator, +OneInputStreamOperator.class, +METHOD_SET_KEY_CONTEXT_ELEMENT, +StreamRecord.class) +|| methodIsOverride( +operator, +AbstractStreamOperator.class, +METHOD_SET_KEY_CONTEXT_ELEMENT1, +StreamRecord.class); +} else { +return methodIsOverride( +operator, +AbstractStreamOperator.class, +METHOD_SET_KEY_CONTEXT_ELEMENT1, +StreamRecord.class); +} +} else { +return methodIsOverride( +operator, +AbstractStreamOperator.class, +METHOD_SET_KEY_CONTEXT_ELEMENT2, +StreamRecord.class); +} +} + +private static boolean methodIsOverride( Review Comment: methodIsOverride -> methodIsOverridden ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -86,6 +118,71 @@ public static ThrowingConsumer, Exception> getRecordProcesso } } +private static boolean canOmitSetKeyContext( +AbstractStreamOperator streamOperator, int input) { +// Since AbstractStreamOperator is @PublicEvolving, we need to check whether the +// "SetKeyContextElement" is overridden by the (user-implemented) subclass. If it is +// overridden, we cannot omit it due to the subclass may maintain different key selectors on +// its own. +return !hasKeyContext(streamOperator, input) +&& !methodSetKeyContextIsOverride(streamOperator, input); +} + +private static boolean hasKeyContext(AbstractStreamOperator operator, int input) { +if (input == 0) { +return operator.hasKeyContext1(); +} else { +return operator.hasKeyContext2(); +} +} + +private static boolean methodSetKeyContextIsOverride( Review Comment: methodSetKeyContextIsOverride -> methodSetKeyContextElementIsOverridden ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java: ## @@ -95,6 +100,163 @@ void testGetRecordProcessor2() throws Exception { assertThat(operator3.processElement2Called).isTrue(); } +@Test +void testOverrideSetKeyContextElementForOneInputStreamOperator() throws Exception { +// test no override +NoOverrideOneInputStreamOperator noOverride = new NoOverrideOneInputStreamOperator(); +RecordProcessorUtils.getRecordProcessor(noOverride).accept(new StreamRecord<>("test")); +assertThat(noOverride.setCurrentKeyCalled).isFalse(); + +// test override "SetKeyContextElement" +OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext = +new OverrideSetKeyContextOneInputStreamOperator(); +RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext) +.accept(new StreamRecord<>("test")); +assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue(); + +// test override "SetKeyContextElement1" +OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 = +new OverrideSetKeyContext1OneInputStreamOperator(); +
[GitHub] [flink] xintongsong commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
xintongsong commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070900657 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; + +/** + * Calculate the buffers specs(the exclusive buffers per channel, min/max optional buffers per gate) + * based on the configurations, result partition type, and the number of channels. + * + * The threshold is configured by {@link + * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE}. If the option is not + * configured, the threshold for Batch jobs is {@link + * InputGateSpecUitls#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the threshold for + * Streaming jobs is {@link InputGateSpecUitls#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM}. + */ Review Comment: JavaDoc needs to be updated. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateSpecUitls.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; + +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** Utils to manage the specs of the {@link InputGate}, for example, {@link GateBuffersSpec}. */ +public class InputGateSpecUitls { + +public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH = 1000; + +public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM = Integer.MAX_VALUE; + +public static GateBuffersSpec createGateBuffersSpec( +Optional configuredMaxRequiredBuffersPerGate, +int configuredNetworkBuffersPerChannel, +int configuredFloatingNetworkBuffersPerGate, +ResultPartitionType partitionType, +int numInputChannels) { +int maxRequiredBuffersThresholdPerGate = +getEffectiveMaxRequiredBuffersPerGate( +partitionType, configuredMaxRequiredBuffersPerGate); +int targetRequiredBuffersPerGate = +getRequiredBuffersTargetPerGate( +numInputChannels, configuredNetworkBuffersPerChannel); +int targetTotalBuffersPerGate = +getTotalBuffersTargetPerGate( +numInputChannels, +configuredNetworkBuffersPerChannel, +configuredFloatingNetworkBuffersPerGate); +int requiredBuffersPerGate = +Math.min(maxRequiredBuffersThresholdPerGate, targetRequiredBuffersPerGate); + +int effectiveExclusiveBuffersPerChannel = +getExclusiveBuffersPerChannel( +configuredNetworkBuffersPerChannel, +numInputChannels, +requiredBuffersPerGate); +int effectiveExclusiveBuffersPerGate = +getEffectiveExclusiveBuffersPerGate( +numInputChannels, effectiveExclusiveBuffersPerChannel); + +int
[GitHub] [flink] luoyuxia commented on a diff in pull request #21676: [FLINK-30662][table] Planner supports delete
luoyuxia commented on code in PR #21676: URL: https://github.com/apache/flink/pull/21676#discussion_r1070910506 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java: ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.resolver.ExpressionResolver; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.TableFactoryUtil; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.planner.utils.TableConfigUtils; + +import org.apache.calcite.plan.RelOptPredicateList; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.rules.ReduceExpressionsRule; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; + +/** A utility class for delete push down. */ +public class DeletePushDownUtils { + +/** + * Get the {@link DynamicTableSink} for the table to be modified. Return Optional.empty() if it + * can't get the {@link DynamicTableSink}. + */ +public static Optional getDynamicTableSink( Review Comment: Just, most if from PlannerBase#getTableSink. ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java: ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager; +import
[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect
lsyldliu commented on code in PR #21601: URL: https://github.com/apache/flink/pull/21601#discussion_r1070910191 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testAvgAggFunctionPlan() { +// test explain +String actualPlan = explainSql("select x, avg(y) from foo group by x"); + assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out")); +} + +@Test +public void testAvgAggFunction() throws Exception { +tableEnv.executeSql( +"create table test_avg(a int, x string, y string, z int, f bigint, d decimal(20, 5), e double)"); +tableEnv.executeSql( +"insert into test_avg values (1, NULL, '2', 1, 2, 2.22, 2.3), " ++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), " ++ "(2, NULL, '4', 1, 2, 4.55, 4.5), " ++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)") +.await(); + +// test avg all element is null +List result = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(x) from test_avg").collect()); +assertThat(result.toString()).isEqualTo("[+I[null]]"); + +// test avg that some string elements can't convert to double +List result2 = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(y) from test_avg").collect()); +assertThat(result2.toString()).isEqualTo("[+I[3.0]]"); + +// test avg bigint with null element +List result3 = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(f) from test_avg").collect()); +assertThat(result3.toString()).isEqualTo("[+I[2.3335]]"); + +// test avg decimal +List result4 = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(d) from test_avg").collect()); +assertThat(result4.toString()).isEqualTo("[+I[3.94000]]"); Review Comment: The decimal parameter precision is 20, scale is 5, according to hive decimal result type infer rule, the result type is decimal(19, 9), the scale plus 4, so the test result is corrected. Moreover, I've run this test use hive `GenericUDAFAverage`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
luoyuxia commented on PR #21257: URL: https://github.com/apache/flink/pull/21257#issuecomment-1383600766 @lsyldliu Thanks for contribution. I have addressed your comments. For the parallelism of `CompactOperator` in aqe , I have created FLINK-29635 to track it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30508) CliClientITCase.testSqlStatements failed with output not matched with expected
[ https://issues.apache.org/jira/browse/FLINK-30508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677199#comment-17677199 ] Matthias Pohl commented on FLINK-30508: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44824=logs=de826397-1924-5900-0034-51895f69d4b7=f311e913-93a2-5a37-acab-4a63e1328f94=15722 > CliClientITCase.testSqlStatements failed with output not matched with expected > -- > > Key: FLINK-30508 > URL: https://issues.apache.org/jira/browse/FLINK-30508 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0, 1.17.0 >Reporter: Qingsheng Ren >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44246=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=14992 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30640) Unstable test in CliClientITCase
[ https://issues.apache.org/jira/browse/FLINK-30640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl closed FLINK-30640. - Resolution: Duplicate I'm closing this issue in favor of FLINK-30508. > Unstable test in CliClientITCase > > > Key: FLINK-30640 > URL: https://issues.apache.org/jira/browse/FLINK-30640 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Client >Affects Versions: 1.17.0 >Reporter: yuzelin >Priority: Major > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44743=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4] > > The failed test can work normally in my local environment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30695) Support to set parallelism for compact operator according to the number of files in AQE.
luoyuxia created FLINK-30695: Summary: Support to set parallelism for compact operator according to the number of files in AQE. Key: FLINK-30695 URL: https://issues.apache.org/jira/browse/FLINK-30695 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: luoyuxia -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect
lsyldliu commented on code in PR #21601: URL: https://github.com/apache/flink/pull/21601#discussion_r1070904577 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testAvgAggFunctionPlan() { +// test explain +String actualPlan = explainSql("select x, avg(y) from foo group by x"); + assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out")); +} + +@Test +public void testAvgAggFunction() throws Exception { +tableEnv.executeSql( +"create table test_avg(a int, x string, y string, z int, f bigint, d decimal(20, 5), e double)"); +tableEnv.executeSql( +"insert into test_avg values (1, NULL, '2', 1, 2, 2.22, 2.3), " ++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), " ++ "(2, NULL, '4', 1, 2, 4.55, 4.5), " ++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)") +.await(); + +// test avg all element is null +List result = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(x) from test_avg").collect()); +assertThat(result.toString()).isEqualTo("[+I[null]]"); + +// test avg that some string elements can't convert to double +List result2 = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(y) from test_avg").collect()); +assertThat(result2.toString()).isEqualTo("[+I[3.0]]"); + +// test avg bigint with null element +List result3 = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(f) from test_avg").collect()); +assertThat(result3.toString()).isEqualTo("[+I[2.3335]]"); + +// test avg decimal +List result4 = Review Comment: For the "some string elements can't convert to decimal", the string is converted to double type, so here doesn't exist case that string convert to decimal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect
lsyldliu commented on code in PR #21601: URL: https://github.com/apache/flink/pull/21601#discussion_r1070898438 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,63 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testAvgAggFunctionPlan() { +// test explain +String actualPlan = explainSql("select x, avg(y) from foo group by x"); + assertThat(actualPlan).isEqualTo(readFromResource("/explain/testAvgAggFunctionPlan.out")); +} + +@Test +public void testAvgAggFunction() throws Exception { +tableEnv.executeSql( +"create table test_avg(a int, x string, y string, z int, f bigint, d decimal(20, 5), e double)"); +tableEnv.executeSql( +"insert into test_avg values (1, NULL, '2', 1, 2, 2.22, 2.3), " ++ "(1, NULL, 'b', 2, NULL, 3.33, 3.4), " ++ "(2, NULL, '4', 1, 2, 4.55, 4.5), " ++ "(2, NULL, NULL, 4, 3, 5.66, 5.2)") +.await(); + +// test avg all element is null +List result = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(x) from test_avg").collect()); +assertThat(result.toString()).isEqualTo("[+I[null]]"); + +// test avg that some string elements can't convert to double +List result2 = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select avg(y) from test_avg").collect()); +assertThat(result2.toString()).isEqualTo("[+I[3.0]]"); Review Comment: As the following explains, it should be `[+I[3.0]]`, the null element doesn't count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect
lsyldliu commented on code in PR #21601: URL: https://github.com/apache/flink/pull/21601#discussion_r1070897568 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveAverageAggFunction.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import java.math.BigDecimal; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive average aggregate function. */ +public class HiveAverageAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private final UnresolvedReferenceExpression count = unresolvedRef("count"); +private DataType resultType; +private DataType sumResultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum, count}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getSumResultType(), DataTypes.BIGINT()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ sumInitialValue(), /* count = */ literal(0L)}; +} + +@Override +public Expression[] accumulateExpressions() { +// cast the operand to sum needed type +Expression tryCastOperand = tryCast(operand(0), typeLiteral(getSumResultType())); +return new Expression[] { +/* sum = */ ifThenElse(isNull(tryCastOperand), sum, adjustedPlus(sum, tryCastOperand)), +/* count = */ ifThenElse(isNull(tryCastOperand), count, plus(count, literal(1L))), Review Comment: After deep dive into [hive](https://github.com/apache/hive/blob/7b3ecf617a6d46f48a3b6f77e0339fd4ad95a420/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java#L536) implementation. If a parameter is null, sum and count both don't do operation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries
[GitHub] [flink] JunRuiLee commented on pull request #21672: [FLINK-30683][runtime] Make adaptive batch scheduler as the default batch scheduler
JunRuiLee commented on PR #21672: URL: https://github.com/apache/flink/pull/21672#issuecomment-1383573928 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
TanYuxin-tyx commented on PR #21620: URL: https://github.com/apache/flink/pull/21620#issuecomment-1383572580 Thanks for helping review the change. I have resolved the conflicts and updated the PR. Could you please help review it again? @xintongsong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30516) Support files table in table store
[ https://issues.apache.org/jira/browse/FLINK-30516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30516: --- Labels: pull-request-available (was: ) > Support files table in table store > -- > > Key: FLINK-30516 > URL: https://issues.apache.org/jira/browse/FLINK-30516 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.3.0, table-store-0.4.0 >Reporter: Shammon >Priority: Major > Labels: pull-request-available > > Add files table in Table Store and user can query row count from > `mytable$files` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
luoyuxia commented on code in PR #21257: URL: https://github.com/apache/flink/pull/21257#discussion_r1070892251 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.batch.compact; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.table.FileSystemCommitter; +import org.apache.flink.connector.file.table.FileSystemFactory; +import org.apache.flink.connector.file.table.PartitionCommitPolicy; +import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory; +import org.apache.flink.connector.file.table.TableMetaStoreFactory; +import org.apache.flink.connector.file.table.stream.compact.CompactMessages; +import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Committer operator for partition in batch mode. This is the single (non-parallel) task. It + * collects all the partition information including partition and written files send from upstream. + */ +public class BatchPartitionCommitterSink extends RichSinkFunction { Review Comment: No, Flink will expect return a `sink`. So, here we make it extend RichSinkFunction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] zjureel opened a new pull request, #484: [FLINK-30516] Introduce files table in table store
zjureel opened a new pull request, #484: URL: https://github.com/apache/flink-table-store/pull/484 Introduce files table in Table Store -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
TanYuxin-tyx commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070891601 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.NettyShuffleUtils; + +import java.util.Optional; + +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Calculate the buffers specs(the exclusive buffers per channel, min/max optional buffers per gate) + * based on the configurations, result partition type, and the number of channels. + * + * The threshold is configured by {@link + * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. If the option is + * not configured, the threshold for Batch jobs is {@link + * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the threshold for + * Streaming jobs is {#link NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}. + */ +public class GateBuffersSpec { + +private final int effectiveExclusiveBuffersPerChannel; + +private final int minOptionalBuffers; + +private final int maxOptionalBuffers; + +private final int maxEffectiveTotalBuffersPerGate; + +private GateBuffersSpec( +int effectiveExclusiveBuffersPerChannel, +int minOptionalBuffers, +int maxOptionalBuffers, +int maxEffectiveTotalBuffersPerGate) { +this.effectiveExclusiveBuffersPerChannel = effectiveExclusiveBuffersPerChannel; +this.minOptionalBuffers = minOptionalBuffers; +this.maxOptionalBuffers = maxOptionalBuffers; +this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate; +} + +int minOptionalBuffers() { +return minOptionalBuffers; +} + +int maxOptionalBuffers() { +return maxOptionalBuffers; +} + +int getEffectiveExclusiveBuffersPerChannel() { +return effectiveExclusiveBuffersPerChannel; +} + +public int maxEffectiveTotalGateBuffers() { +return maxEffectiveTotalBuffersPerGate; +} + +public static GateBuffersSpec from( +Optional configuredMaxRequiredBuffersPerGate, +int configuredNetworkBuffersPerChannel, +int configuredFloatingNetworkBuffersPerGate, +ResultPartitionType partitionType, +int numInputChannels) { +int maxRequiredBuffersThresholdPerGate = +getEffectiveMaxRequiredBuffersPerGate( +partitionType, configuredMaxRequiredBuffersPerGate); +int minBuffersTargetPerGate = +getMinBuffersTargetPerGate(numInputChannels, configuredNetworkBuffersPerChannel); +int maxBuffersTargetPerGate = +getMaxBuffersTargetPerGate( +numInputChannels, +configuredNetworkBuffersPerChannel, +configuredFloatingNetworkBuffersPerGate); +int minRequiredBuffersPerGate = +Math.min(maxRequiredBuffersThresholdPerGate, minBuffersTargetPerGate); + +int effectiveExclusiveBuffersPerChannel = +adjustExclusiveBuffersPerChannel( +configuredNetworkBuffersPerChannel, +numInputChannels, +minRequiredBuffersPerGate); +int effectiveExclusiveBuffersPerGate = +
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
TanYuxin-tyx commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070891141 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.NettyShuffleUtils; + +import java.util.Optional; + +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Calculate the buffers specs(the exclusive buffers per channel, min/max optional buffers per gate) + * based on the configurations, result partition type, and the number of channels. + * + * The threshold is configured by {@link + * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. If the option is + * not configured, the threshold for Batch jobs is {@link + * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the threshold for + * Streaming jobs is {#link NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}. + */ +public class GateBuffersSpec { + +private final int effectiveExclusiveBuffersPerChannel; + +private final int minOptionalBuffers; + +private final int maxOptionalBuffers; + +private final int maxEffectiveTotalBuffersPerGate; + +private GateBuffersSpec( +int effectiveExclusiveBuffersPerChannel, +int minOptionalBuffers, +int maxOptionalBuffers, +int maxEffectiveTotalBuffersPerGate) { +this.effectiveExclusiveBuffersPerChannel = effectiveExclusiveBuffersPerChannel; +this.minOptionalBuffers = minOptionalBuffers; +this.maxOptionalBuffers = maxOptionalBuffers; +this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate; +} + +int minOptionalBuffers() { +return minOptionalBuffers; +} + +int maxOptionalBuffers() { +return maxOptionalBuffers; +} + +int getEffectiveExclusiveBuffersPerChannel() { +return effectiveExclusiveBuffersPerChannel; +} + +public int maxEffectiveTotalGateBuffers() { +return maxEffectiveTotalBuffersPerGate; +} + +public static GateBuffersSpec from( +Optional configuredMaxRequiredBuffersPerGate, +int configuredNetworkBuffersPerChannel, +int configuredFloatingNetworkBuffersPerGate, +ResultPartitionType partitionType, +int numInputChannels) { +int maxRequiredBuffersThresholdPerGate = +getEffectiveMaxRequiredBuffersPerGate( +partitionType, configuredMaxRequiredBuffersPerGate); +int minBuffersTargetPerGate = +getMinBuffersTargetPerGate(numInputChannels, configuredNetworkBuffersPerChannel); +int maxBuffersTargetPerGate = +getMaxBuffersTargetPerGate( +numInputChannels, +configuredNetworkBuffersPerChannel, +configuredFloatingNetworkBuffersPerGate); +int minRequiredBuffersPerGate = +Math.min(maxRequiredBuffersThresholdPerGate, minBuffersTargetPerGate); + +int effectiveExclusiveBuffersPerChannel = +adjustExclusiveBuffersPerChannel( +configuredNetworkBuffersPerChannel, +numInputChannels, +minRequiredBuffersPerGate); +int effectiveExclusiveBuffersPerGate = +
[GitHub] [flink] X-czh commented on pull request #21673: [FLINK-30513] Cleanup HA storage path on cluster termination
X-czh commented on PR #21673: URL: https://github.com/apache/flink/pull/21673#issuecomment-1383570619 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
TanYuxin-tyx commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070890795 ## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java: ## @@ -119,6 +139,82 @@ public static int computeNetworkBuffersForAnnouncing( return requirementForInputs + requirementForOutputs; } +public static int getEffectiveMaxRequiredBuffersPerGate( +ResultPartitionType partitionType, +Optional configuredMaxRequiredBuffersPerGate) { +return configuredMaxRequiredBuffersPerGate.orElseGet( +() -> +isPipelineResultPartition(partitionType) +? DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM +: DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH); +} + +private static int getNumBuffersToAnnounceForInputGate( +ResultPartitionType type, +int configuredNetworkBuffersPerChannel, +int floatingNetworkBuffersPerGate, +Optional maxRequiredBuffersPerGate, +int numInputChannels) { +GateBuffersSpec gateBuffersSpec = +GateBuffersSpec.from( +maxRequiredBuffersPerGate, +configuredNetworkBuffersPerChannel, +floatingNetworkBuffersPerGate, +type, +numInputChannels); +return gateBuffersSpec.maxEffectiveTotalGateBuffers(); +} + +public static boolean isPipelineResultPartition(ResultPartitionType partitionType) { +return partitionType.isPipelinedOrPipelinedBoundedResultPartition() +|| partitionType == ResultPartitionType.PIPELINED_APPROXIMATE; +} + +/** + * Since at least one floating buffer is required, the number of min required buffers is reduced + * by 1, and then the average number of buffers per channel is calculated. Take the minimum + * value to ensure that the number of required buffers per gate is not more than the given + * minRequiredBuffersPerGate.}. + */ +public static int adjustExclusiveBuffersPerChannel( +int configuredNetworkBuffersPerChannel, +int numInputChannels, +int minRequiredBuffersPerGate) { +checkArgument(numInputChannels > 0, "Must be positive."); +return Math.min( +configuredNetworkBuffersPerChannel, +(minRequiredBuffersPerGate - 1) / numInputChannels); Review Comment: Added a check about it. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.NettyShuffleUtils; + +import java.util.Optional; + +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Calculate the buffers specs(the exclusive buffers per channel, min/max optional buffers per gate) + * based on the configurations, result partition type, and the number of channels. + * + * The threshold is configured by {@link + * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. If the option is + * not configured, the threshold for Batch jobs is {@link + * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the threshold for + * Streaming jobs is {#link
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
TanYuxin-tyx commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070890360 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.NettyShuffleUtils; + +import java.util.Optional; + +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Calculate the buffers specs(the exclusive buffers per channel, min/max optional buffers per gate) + * based on the configurations, result partition type, and the number of channels. + * + * The threshold is configured by {@link + * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. If the option is + * not configured, the threshold for Batch jobs is {@link + * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the threshold for + * Streaming jobs is {#link NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}. + */ +public class GateBuffersSpec { + +private final int effectiveExclusiveBuffersPerChannel; + +private final int minOptionalBuffers; + +private final int maxOptionalBuffers; + +private final int maxEffectiveTotalBuffersPerGate; + +private GateBuffersSpec( +int effectiveExclusiveBuffersPerChannel, +int minOptionalBuffers, +int maxOptionalBuffers, +int maxEffectiveTotalBuffersPerGate) { +this.effectiveExclusiveBuffersPerChannel = effectiveExclusiveBuffersPerChannel; +this.minOptionalBuffers = minOptionalBuffers; +this.maxOptionalBuffers = maxOptionalBuffers; +this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate; +} + +int minOptionalBuffers() { +return minOptionalBuffers; +} + +int maxOptionalBuffers() { +return maxOptionalBuffers; +} + +int getEffectiveExclusiveBuffersPerChannel() { +return effectiveExclusiveBuffersPerChannel; +} + +public int maxEffectiveTotalGateBuffers() { +return maxEffectiveTotalBuffersPerGate; +} + +public static GateBuffersSpec from( +Optional configuredMaxRequiredBuffersPerGate, +int configuredNetworkBuffersPerChannel, +int configuredFloatingNetworkBuffersPerGate, +ResultPartitionType partitionType, +int numInputChannels) { +int maxRequiredBuffersThresholdPerGate = +getEffectiveMaxRequiredBuffersPerGate( +partitionType, configuredMaxRequiredBuffersPerGate); +int minBuffersTargetPerGate = +getMinBuffersTargetPerGate(numInputChannels, configuredNetworkBuffersPerChannel); +int maxBuffersTargetPerGate = Review Comment: Renamed it. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
TanYuxin-tyx commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070890224 ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -225,6 +225,35 @@ public class NettyShuffleEnvironmentOptions { + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" + " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster."); +/** + * Maximum number of network buffers to use for each outgoing/incoming gate (result + * partition/input gate), which contains all exclusive network buffers for all subpartitions and + * all floating buffers for the gate. The exclusive network buffers for one channel is + * configured by {@link #NETWORK_BUFFERS_PER_CHANNEL} and the floating buffers for one gate is + * configured by {@link #NETWORK_EXTRA_BUFFERS_PER_GATE}. + */ +@Experimental +@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) +public static final ConfigOption NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX = Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
TanYuxin-tyx commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070890650 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.NettyShuffleUtils; + +import java.util.Optional; + +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Calculate the buffers specs(the exclusive buffers per channel, min/max optional buffers per gate) + * based on the configurations, result partition type, and the number of channels. + * + * The threshold is configured by {@link + * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. If the option is + * not configured, the threshold for Batch jobs is {@link + * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the threshold for + * Streaming jobs is {#link NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}. + */ +public class GateBuffersSpec { + +private final int effectiveExclusiveBuffersPerChannel; + +private final int minOptionalBuffers; + +private final int maxOptionalBuffers; + +private final int maxEffectiveTotalBuffersPerGate; + +private GateBuffersSpec( +int effectiveExclusiveBuffersPerChannel, +int minOptionalBuffers, +int maxOptionalBuffers, +int maxEffectiveTotalBuffersPerGate) { +this.effectiveExclusiveBuffersPerChannel = effectiveExclusiveBuffersPerChannel; +this.minOptionalBuffers = minOptionalBuffers; +this.maxOptionalBuffers = maxOptionalBuffers; +this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate; +} + +int minOptionalBuffers() { +return minOptionalBuffers; +} + +int maxOptionalBuffers() { +return maxOptionalBuffers; +} + +int getEffectiveExclusiveBuffersPerChannel() { +return effectiveExclusiveBuffersPerChannel; +} + +public int maxEffectiveTotalGateBuffers() { +return maxEffectiveTotalBuffersPerGate; +} + +public static GateBuffersSpec from( +Optional configuredMaxRequiredBuffersPerGate, +int configuredNetworkBuffersPerChannel, +int configuredFloatingNetworkBuffersPerGate, +ResultPartitionType partitionType, +int numInputChannels) { +int maxRequiredBuffersThresholdPerGate = +getEffectiveMaxRequiredBuffersPerGate( +partitionType, configuredMaxRequiredBuffersPerGate); +int minBuffersTargetPerGate = +getMinBuffersTargetPerGate(numInputChannels, configuredNetworkBuffersPerChannel); +int maxBuffersTargetPerGate = +getMaxBuffersTargetPerGate( +numInputChannels, +configuredNetworkBuffersPerChannel, +configuredFloatingNetworkBuffersPerGate); +int minRequiredBuffersPerGate = +Math.min(maxRequiredBuffersThresholdPerGate, minBuffersTargetPerGate); + +int effectiveExclusiveBuffersPerChannel = +adjustExclusiveBuffersPerChannel( Review Comment: Renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe,
[GitHub] [flink] JunRuiLee commented on pull request #21672: [FLINK-30683][runtime] Make adaptive batch scheduler as the default batch scheduler
JunRuiLee commented on PR #21672: URL: https://github.com/apache/flink/pull/21672#issuecomment-1383568933 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
luoyuxia commented on code in PR #21257: URL: https://github.com/apache/flink/pull/21257#discussion_r1070885603 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ## @@ -76,7 +76,8 @@ public PartitionLoader( } /** Load a single partition. */ -public void loadPartition(LinkedHashMap partSpec, List srcDirs) +public void loadPartition( Review Comment: Yes, it must be dir or not simultaneously. There shouldn't be any case that some paths are dir and some are files. If have, it shouldn't be written by Flink. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
luoyuxia commented on code in PR #21257: URL: https://github.com/apache/flink/pull/21257#discussion_r1070884810 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.batch.compact; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.table.FileSystemFactory; +import org.apache.flink.connector.file.table.OutputFormatFactory; +import org.apache.flink.connector.file.table.PartitionComputer; +import org.apache.flink.connector.file.table.PartitionTempFileManager; +import org.apache.flink.connector.file.table.PartitionWriter; +import org.apache.flink.connector.file.table.PartitionWriterFactory; +import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput; +import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableException; + +import java.util.LinkedHashMap; + +/** + * An operator for writing files in batch mode. Once creating a new file to write, the writing + * operator will emit the written file to downstream. + */ +public class BatchFileWriter extends AbstractStreamOperator Review Comment: Yes, it aligns the function with `FileSystemOutputFormat`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] thexiay commented on a diff in pull request #21676: [FLINK-30662][table] Planner supports delete
thexiay commented on code in PR #21676: URL: https://github.com/apache/flink/pull/21676#discussion_r1070884554 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java: ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.resolver.ExpressionResolver; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.TableFactoryUtil; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.planner.utils.TableConfigUtils; + +import org.apache.calcite.plan.RelOptPredicateList; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.rules.ReduceExpressionsRule; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; + +/** A utility class for delete push down. */ +public class DeletePushDownUtils { + +/** + * Get the {@link DynamicTableSink} for the table to be modified. Return Optional.empty() if it + * can't get the {@link DynamicTableSink}. + */ +public static Optional getDynamicTableSink( Review Comment: Is it copied from PlannerBase#getTableSink? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
luoyuxia commented on code in PR #21257: URL: https://github.com/apache/flink/pull/21257#discussion_r1070883134 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java: ## @@ -62,4 +62,24 @@ OutputFormat createNewOutputFormat(Path path) throws IOException { return format; } } + +/** Listener for partition writer. */ +interface PartitionWriterListener { +/** + * Notifies a new file has been created. + * + * @param partition The partition for the newly created file. + * @param file The newly created file. + */ +void onFileOpen(String partition, Path file); Review Comment: Actually it should be `onFileOpen`. but I change the doc for the method, ```java Note that this does not mean that the file has been created in the file system. It is only created logically and the actual file will be generated after it is committed. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #21257: [FLINK-29879][filesystem] introduce operators for merging files in batch mode.
luoyuxia commented on code in PR #21257: URL: https://github.com/apache/flink/pull/21257#discussion_r1070883134 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java: ## @@ -62,4 +62,24 @@ OutputFormat createNewOutputFormat(Path path) throws IOException { return format; } } + +/** Listener for partition writer. */ +interface PartitionWriterListener { +/** + * Notifies a new file has been created. + * + * @param partition The partition for the newly created file. + * @param file The newly created file. + */ +void onFileOpen(String partition, Path file); Review Comment: Actually it should be `onFileOpen`. but I change the doc for the method, ```java Note that this does not mean that the file has been created in the file system. It is only created logically and the actual file will be generated after it is committed. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
[ https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677179#comment-17677179 ] Liu commented on FLINK-30629: - [~xtsong] Have a look at the code? Thanks. > ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable > - > > Key: FLINK-30629 > URL: https://issues.apache.org/jira/browse/FLINK-30629 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.17.0 >Reporter: Xintong Song >Assignee: Liu >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0 > > Attachments: ClientHeartbeatTestLog.txt > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819 > {code:java} > Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 21.02 s <<< FAILURE! - in > org.apache.flink.client.ClientHeartbeatTest > Jan 11 04:32:39 [ERROR] > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat > Time elapsed: 9.157 s <<< ERROR! > Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet > running or has already been shut down. > Jan 11 04:32:39 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) > Jan 11 04:32:39 at > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect
lsyldliu commented on code in PR #21601: URL: https://github.com/apache/flink/pull/21601#discussion_r1070878161 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveAverageAggFunction.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import java.math.BigDecimal; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive average aggregate function. */ +public class HiveAverageAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private final UnresolvedReferenceExpression count = unresolvedRef("count"); +private DataType resultType; +private DataType sumResultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum, count}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getSumResultType(), DataTypes.BIGINT()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ sumInitialValue(), /* count = */ literal(0L)}; +} + +@Override +public Expression[] accumulateExpressions() { +// cast the operand to sum needed type +Expression tryCastOperand = tryCast(operand(0), typeLiteral(getSumResultType())); Review Comment: They are not equivalent, the `PrimitiveObjectInspectorUtils.getDouble` will throw `NumberFormatException` when parse failed, but `tryCast` is equal with ``` try { PrimitiveObjectInspectorUtils.getDouble } catch(NumberFortmatException e) { } ``` Hive avg udaf will catch the exception. Please see following code in hive `GenericUDAFAverage` for more detail ``` try { // Skip the same value if avgDistinct is true if (isWindowingDistinct()) { ObjectInspectorObject obj = new ObjectInspectorObject( ObjectInspectorUtils.copyToStandardObject(parameter, inputOI, ObjectInspectorCopyOption.JAVA), copiedOI); if
[GitHub] [flink] lsyldliu commented on a diff in pull request #21601: [FLINK-27920][hive] Supports native avg function for hive dialect
lsyldliu commented on code in PR #21601: URL: https://github.com/apache/flink/pull/21601#discussion_r1070878161 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveAverageAggFunction.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; + +import java.math.BigDecimal; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.hiveAggDecimalPlus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral; +import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** built-in hive average aggregate function. */ +public class HiveAverageAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression sum = unresolvedRef("sum"); +private final UnresolvedReferenceExpression count = unresolvedRef("count"); +private DataType resultType; +private DataType sumResultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {sum, count}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getSumResultType(), DataTypes.BIGINT()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] {/* sum = */ sumInitialValue(), /* count = */ literal(0L)}; +} + +@Override +public Expression[] accumulateExpressions() { +// cast the operand to sum needed type +Expression tryCastOperand = tryCast(operand(0), typeLiteral(getSumResultType())); Review Comment: They are not equivalent, the `PrimitiveObjectInspectorUtils.getDouble` will throw `NumberFormatException` when parse failed, but `tryCast` is equal with ``` try { } catch(NumberFortmatException e) { } ``` Hive avg udaf will catch the exception. Please see following code in hive `GenericUDAFAverage` for more detail ``` try { // Skip the same value if avgDistinct is true if (isWindowingDistinct()) { ObjectInspectorObject obj = new ObjectInspectorObject( ObjectInspectorUtils.copyToStandardObject(parameter, inputOI, ObjectInspectorCopyOption.JAVA), copiedOI); if (averageAggregation.uniqueObjects.contains(obj)) {
[jira] [Commented] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced
[ https://issues.apache.org/jira/browse/FLINK-30679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677172#comment-17677172 ] luoyuxia commented on FLINK-30679: -- [~hehuiyuan] Thanks for contribution. I will have a look. > Can not load the data of hive dim table when project-push-down is introduced > > > Key: FLINK-30679 > URL: https://issues.apache.org/jira/browse/FLINK-30679 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.14.6 >Reporter: hehuiyuan >Priority: Critical > Labels: pull-request-available > > > Can not load the data of hive dim table when project-push-down is introduced. > The project push down optimize:[https://github.com/apache/flink/pull/21311] > hive-exec version: 2.3.4 > flink version: 1.14.6 > flink-hive-connector: the latest code for release-1.14 branch > > vectorize read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} > > > mapreduce read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > ~[?:1.8.0_301] > at > java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) > ~[?:1.8.0_301] > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > ~[?:1.8.0_301] > at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) > ~[?:1.8.0_301] > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} > > > The sql : > > {code:java} > CREATE TABLE kafkaTableSource ( > name string, > age int, > sex string, > address string, > ptime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'hehuiyuan1', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.client.id' = 'test-consumer-group', > 'properties.group.id' = 'test-consumer-group', > 'format' = 'csv' > ); > CREATE TABLE printsink ( > name string, > age int, > sex string, >
[GitHub] [flink] luoyuxia commented on pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update
luoyuxia commented on PR #21658: URL: https://github.com/apache/flink/pull/21658#issuecomment-1383550093 @LadyForest @lincoln-lil Thanks for your reviewing. I have addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update
luoyuxia commented on code in PR #21658: URL: https://github.com/apache/flink/pull/21658#discussion_r1070871519 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Interface for {@link DynamicTableSink}s that support delete existing data according to row-level + * changes. The table sink is responsible for telling planner how to produce the row changes, and + * consuming them to achieve rows delete purpose. + * + * The planner will call the method {@link #applyRowLevelDelete(RowLevelModificationScanContext)} + * to get the {@link RowLevelDeleteInfo} returned by sink, and rewrite the delete statement based on + * the gotten {@link RowLevelDeleteInfo} to produce rows to {@link DynamicTableSink}. + * + * Note: If the sink also implements {@link SupportsDeletePushDown}, the planner will always + * prefer {@link SupportsDeletePushDown}, and only the filters aren't available or {@link + * SupportsDeletePushDown#applyDeleteFilters(List)} returns false, this interface will be + * considered. Review Comment: Thanks. That's more concise and clear.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30257) SqlClientITCase#testMatchRecognize failed
[ https://issues.apache.org/jira/browse/FLINK-30257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677166#comment-17677166 ] Lijie Wang commented on FLINK-30257: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44871=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a > SqlClientITCase#testMatchRecognize failed > - > > Key: FLINK-30257 > URL: https://issues.apache.org/jira/browse/FLINK-30257 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0 >Reporter: Martijn Visser >Priority: Major > Labels: test-stability > Attachments: image-2022-12-29-21-47-31-606.png > > > {code:java} > Nov 30 21:54:41 [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 224.683 s <<< FAILURE! - in SqlClientITCase > Nov 30 21:54:41 [ERROR] SqlClientITCase.testMatchRecognize Time elapsed: > 50.164 s <<< FAILURE! > Nov 30 21:54:41 org.opentest4j.AssertionFailedError: > Nov 30 21:54:41 > Nov 30 21:54:41 expected: 1 > Nov 30 21:54:41 but was: 0 > Nov 30 21:54:41 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Nov 30 21:54:41 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Nov 30 21:54:41 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Nov 30 21:54:41 at > SqlClientITCase.verifyNumberOfResultRecords(SqlClientITCase.java:297) > Nov 30 21:54:41 at > SqlClientITCase.testMatchRecognize(SqlClientITCase.java:255) > Nov 30 21:54:41 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Nov 30 21:54:41 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Nov 30 21:54:41 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Nov 30 21:54:41 at java.lang.reflect.Method.invoke(Method.java:498) > Nov 30 21:54:41 at > org.junit.platform.commons.util.ReflectionUtils.invokeMetho > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43635=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=14817 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on pull request #21646: [FLINK-30631][runtime] Limit the max number of subpartitons consumed by each downstream task
wanglijie95 commented on PR #21646: URL: https://github.com/apache/flink/pull/21646#issuecomment-1383531832 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28526) Fail to lateral join with UDTF from Table with timstamp column
[ https://issues.apache.org/jira/browse/FLINK-28526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-28526: --- Assignee: Dian Fu (was: Xingbo Huang) > Fail to lateral join with UDTF from Table with timstamp column > -- > > Key: FLINK-28526 > URL: https://issues.apache.org/jira/browse/FLINK-28526 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.15.1 >Reporter: Xuannan Su >Assignee: Dian Fu >Priority: Major > > The bug can be reproduced with the following test > {code:python} > def test_flink(self): > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env) > table = t_env.from_descriptor( > TableDescriptor.for_connector("filesystem") > .schema( > Schema.new_builder() > .column("name", DataTypes.STRING()) > .column("cost", DataTypes.INT()) > .column("distance", DataTypes.INT()) > .column("time", DataTypes.TIMESTAMP(3)) > .watermark("time", "`time` - INTERVAL '60' SECOND") > .build() > ) > .format("csv") > .option("path", "./input.csv") > .build() > ) > @udtf(result_types=DataTypes.INT()) > def table_func(row: Row): > return row.cost + row.distance > table = table.join_lateral(table_func.alias("cost_times_distance")) > table.execute().print() > {code} > It causes the following exception > {code:none} > E pyflink.util.exceptions.TableException: > org.apache.flink.table.api.TableException: Unsupported Python SqlFunction > CAST. > E at > org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146) > E at > org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429) > E at > org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135) > E at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133) > E at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106) > E at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95) > E at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > E at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) > E at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136) > E at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) > E at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79) > E at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > E at scala.collection.Iterator.foreach(Iterator.scala:937) > E at scala.collection.Iterator.foreach$(Iterator.scala:937) > E at > scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > E at scala.collection.IterableLike.foreach(IterableLike.scala:70) > E at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > E at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > E at > scala.collection.TraversableLike.map(TraversableLike.scala:233) > E at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) > E at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > E at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78) > E at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181) > E at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) > E at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) > E at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) > E at >
[jira] [Updated] (FLINK-30676) Introduce Data Structures for table store
[ https://issues.apache.org/jira/browse/FLINK-30676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30676: --- Labels: pull-request-available (was: ) > Introduce Data Structures for table store > - > > Key: FLINK-30676 > URL: https://issues.apache.org/jira/browse/FLINK-30676 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Copy data structures to table store from Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi opened a new pull request, #483: [FLINK-30676] Introduce Data Structures for table store
JingsongLi opened a new pull request, #483: URL: https://github.com/apache/flink-table-store/pull/483 - Introduce new Data Structures - Use new Data Types and Data Structures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30651) Move utility methods to CatalogTest and remove CatalogTestUtils class
[ https://issues.apache.org/jira/browse/FLINK-30651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Samrat Deb updated FLINK-30651: --- Summary: Move utility methods to CatalogTest and remove CatalogTestUtils class (was: Move util methods to CatalogTest and remove CatalogTestUtils class ) > Move utility methods to CatalogTest and remove CatalogTestUtils class > -- > > Key: FLINK-30651 > URL: https://issues.apache.org/jira/browse/FLINK-30651 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / API, Tests >Reporter: Samrat Deb >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0 > > > [CatalogTestUtils|https://github.com/apache/flink/blame/master/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java#L43] > class contains static utilities function. This functions/ methods can be > moved to CatalogTest class and make code-flow easier to understand. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21684: [FLINK-30694][docs]Translate /dev/table/sql/queries/window-tvf
flinkbot commented on PR #21684: URL: https://github.com/apache/flink/pull/21684#issuecomment-1383494024 ## CI report: * 366aa425f0b85e78796941e5d7c3c3aa28ff7002 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677137#comment-17677137 ] chenhaiyang edited comment on FLINK-30694 at 1/16/23 4:58 AM: -- Hi [~jark], the PR has been commited, please take a look here: [https://github.com/apache/flink/pull/21684|https://github.com/apache/flink/pull/21684] was (Author: JIRAUSER298680): Hi [~jark], the PR has been commited, please take a look here: [https://github.com/apache/flink/pull/21682|https://github.com/apache/flink/pull/21682] > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Affects Versions: 1.16.0 >Reporter: chenhaiyang >Assignee: chenhaiyang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.1 > > > The page url is[ > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] NightRunner opened a new pull request, #21684: [FLINK-30694][docs]Translate /dev/table/sql/queries/window-tvf
NightRunner opened a new pull request, #21684: URL: https://github.com/apache/flink/pull/21684 Page "Application Development"/"Table API"/"SQL"/"Queries"/Windows TVF translate to chinese. ## What is the purpose of the change Translate /dev/table/sql/queries/window-tvf.md ## Brief change log Translate /dev/table/sql/queries/window-tvf.md ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] NightRunner closed pull request #21682: [FLINK-30694][docs]Translate /dev/table/sql/queries/window-tvf.md
NightRunner closed pull request #21682: [FLINK-30694][docs]Translate /dev/table/sql/queries/window-tvf.md URL: https://github.com/apache/flink/pull/21682 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #21608: [FLINK-29721][hive] Supports native hive min function for hive dialect
lsyldliu commented on code in PR #21608: URL: https://github.com/apache/flink/pull/21608#discussion_r1070834501 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testMinAggFunctionPlan() { Review Comment: Current Hive ITCase covered all the plan related tests, we can improve this by another separate PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a diff in pull request #21608: [FLINK-29721][hive] Supports native hive min function for hive dialect
godfreyhe commented on code in PR #21608: URL: https://github.com/apache/flink/pull/21608#discussion_r1070833459 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testMinAggFunctionPlan() { +// test explain +String actualPlan = explainSql("select x, min(y) from foo group by x"); + assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMinAggFunctionPlan.out")); +} + +@Test +public void testMinAggFunction() throws Exception { Review Comment: All types should be covered, including unsupported types -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a diff in pull request #21608: [FLINK-29721][hive] Supports native hive min function for hive dialect
godfreyhe commented on code in PR #21608: URL: https://github.com/apache/flink/pull/21608#discussion_r1070833142 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testMinAggFunctionPlan() { Review Comment: it's better the plan test should not be in ITCase ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java: ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.hive; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.lessThan; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; + +/** built-in hive min aggregate function. */ +public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction { + +private final UnresolvedReferenceExpression min = unresolvedRef("min"); +private DataType resultType; + +@Override +public int operandCount() { +return 1; +} + +@Override +public UnresolvedReferenceExpression[] aggBufferAttributes() { +return new UnresolvedReferenceExpression[] {min}; +} + +@Override +public DataType[] getAggBufferTypes() { +return new DataType[] {getResultType()}; +} + +@Override +public DataType getResultType() { +return resultType; +} + +@Override +public Expression[] initialValuesExpressions() { +return new Expression[] { +/* min */ +nullOf(getResultType()) +}; +} + +@Override +public Expression[] accumulateExpressions() { +return new Expression[] { +/* min = */ ifThenElse( +isNull(operand(0)), +min, +ifThenElse( +isNull(min), +operand(0), +ifThenElse(lessThan(operand(0), min), operand(0), min))) +}; +} + +@Override +public Expression[] retractExpressions() { +throw new TableException("Min aggregate function does not support retraction."); +} + +@Override +public Expression[] mergeExpressions() { +return new Expression[] { +/* max = */ ifThenElse( Review Comment: min ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ## @@ -1062,6 +1062,87 @@ public void testSumAggWithGroupKey() throws Exception { tableEnv.executeSql("drop table test_sum_group"); } +@Test +public void testMinAggFunctionPlan() { +// test explain +String actualPlan = explainSql("select x, min(y) from foo group by x"); + assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMinAggFunctionPlan.out")); +} + +@Test +public void testMinAggFunction() throws Exception { Review Comment: all type should be cover, including unsupported types -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact
[jira] [Commented] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677137#comment-17677137 ] chenhaiyang commented on FLINK-30694: - Hi [~jark], the PR has been commited, please take a look here: [https://github.com/apache/flink/pull/21682|https://github.com/apache/flink/pull/21682] > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Affects Versions: 1.16.0 >Reporter: chenhaiyang >Assignee: chenhaiyang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.1 > > > The page url is[ > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21683: [FLINK-30672][table] Support 'EXPLAIN PLAN_ADVICE' statement
flinkbot commented on PR #21683: URL: https://github.com/apache/flink/pull/21683#issuecomment-1383463668 ## CI report: * e0f770c958d667db2d38d0528b9fc63d75aced96 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30672) Support 'EXPLAIN PLAN_ADVICE' statement
[ https://issues.apache.org/jira/browse/FLINK-30672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30672: --- Labels: pull-request-available (was: ) > Support 'EXPLAIN PLAN_ADVICE' statement > --- > > Key: FLINK-30672 > URL: https://issues.apache.org/jira/browse/FLINK-30672 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Jane Chan >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] LadyForest opened a new pull request, #21683: [FLINK-30672][sql-parser] Support 'EXPLAIN PLAN_ADVICE' statement
LadyForest opened a new pull request, #21683: URL: https://github.com/apache/flink/pull/21683 ## What is the purpose of the change This pull request supports the `EXPLAIN PLAN_ADVICE` statement. ## Brief change log - e0f770c9 supports SQL syntax - TODO ## Verifying this change This change added tests and can be verified as follows: - `FlinkSqlParserImplTest` verifies the syntax change. - TODO ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? FLINK-30673 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a diff in pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update
lincoln-lil commented on code in PR #21658: URL: https://github.com/apache/flink/pull/21658#discussion_r1070792730 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.abilities; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.ScanTableSource; + +import javax.annotation.Nullable; + +/** + * Interface for {@link ScanTableSource}s that support the row-level modification. The table source + * is responsible to return the information described by {@link RowLevelModificationScanContext}. Review Comment: 'responsible for returning' ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.abilities; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.ScanTableSource; + +import javax.annotation.Nullable; + +/** + * Interface for {@link ScanTableSource}s that support the row-level modification. The table source + * is responsible to return the information described by {@link RowLevelModificationScanContext}. + * The context will be propagated to the sink which implements {@link SupportsRowLevelUpdate} or + * {@link SupportsRowLevelDelete}. + * + * Note: This interface is optional for table sources to implement. For cases where the table + * source neither need to know the type of row-level modification nor propagate information to sink, + * the table source don't need to implement this interface. See more details at {@link Review Comment: 'For cases where the table source neither needs to know the type of row-level modification nor propagate information to the sink, the table source does not need to implement this interface' ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the
[GitHub] [flink] flinkbot commented on pull request #21682: Translate /dev/table/sql/queries/window-tvf.md
flinkbot commented on PR #21682: URL: https://github.com/apache/flink/pull/21682#issuecomment-1383452082 ## CI report: * 1b19856e2279fc2119314ffe23cc8f2dae3432df UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenhaiyang updated FLINK-30694: Fix Version/s: 1.16.1 Affects Version/s: 1.16.0 > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Affects Versions: 1.16.0 >Reporter: chenhaiyang >Assignee: chenhaiyang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.1 > > > The page url is[ > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] NightRunner opened a new pull request, #21682: Translate /dev/table/sql/queries/window-tvf.md
NightRunner opened a new pull request, #21682: URL: https://github.com/apache/flink/pull/21682 Page "Application Development"/"Table API"/"SQL"/"Queries"/jions translate to chinese. ## What is the purpose of the change Translate /dev/table/sql/queries/window-tvf.md ## Brief change log Translate /dev/table/sql/queries/window-tvf.md ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #196: [FLINK-30541] Add Transformer and Estimator for OnlineStandardScaler
lindong28 commented on code in PR #196: URL: https://github.com/apache/flink-ml/pull/196#discussion_r1070769249 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/metrics/MLMetrics.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A collection class for handling metrics in Flink ML. + * + * All metrics of Flink ML are registered under group "ml", which is a child group of {@link + * org.apache.flink.metrics.groups.OperatorMetricGroup}. Metrics related to model data will be + * registered in the group "ml.model". + * + * For example, the timestamp of the current model data will be reported in metric: + * "{some_parent_groups}.operator.ml.model.timestamp". And the version of the current model data + * will be reported in metric: "{some_parent_groups}.operator.ml.model.version". + */ +@PublicEvolving Review Comment: Since we have not opened FLIP for these metrics, should we mark it `@Experimental` for now? ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasModelVersionCol.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.param; + +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.StringParam; +import org.apache.flink.ml.param.WithParams; + +/** Interface for the shared model version column param. */ +public interface HasModelVersionCol extends WithParams { +Param MODEL_VERSION_COL = +new StringParam( +"modelVersionCol", +"The version of the model data that the input data is predicted with.", Review Comment: Would it be useful to also specify the value type (e.g. long) of this column? ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasMaxAllowedModelDelayMs.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.param; + +import org.apache.flink.ml.param.LongParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.WithParams; + +/** Interface for the shared max allowed model delay in milliseconds param. */ +public interface HasMaxAllowedModelDelayMs extends WithParams { +Param MAX_ALLOWED_MODEL_DELAY_MS = +new LongParam( +"maxAllowedModelDelayMs", +"The maximum difference between the timestamps of the input record and model data when " Review Comment: It seems
[GitHub] [flink] flinkbot commented on pull request #21681: [FLINK-18397][docs-zh] Translate "Table & SQL Connectors Overview" pa…
flinkbot commented on PR #21681: URL: https://github.com/apache/flink/pull/21681#issuecomment-1383439136 ## CI report: * 2c8f06d74c3c48c3ba567e8dc21ce0f1ba3b2e5b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Jrebel-i opened a new pull request, #21681: [FLINK-18397][docs-zh] Translate "Table & SQL Connectors Overview" pa…
Jrebel-i opened a new pull request, #21681: URL: https://github.com/apache/flink/pull/21681 ## What is the purpose of the change [FLINK-18397](https://issues.apache.org/jira/browse/FLINK-18397) Translate "Table & SQL Connectors Overview" page into Chinese. The page url is https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connectors/ ## Brief change log The markdown file is located in *docs/content.zh/docs/connectors/table/overview.md* ## Verifying this change This change is a trivial rework/code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not documented** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub commented on pull request #21680: [FLINK-30191][python] Update net.sf.py4j:py4j dependency to 0.10.9.7
yunfengzhou-hub commented on PR #21680: URL: https://github.com/apache/flink/pull/21680#issuecomment-1383431886 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21680: [FLINK-30191][python] Update net.sf.py4j:py4j dependency to 0.10.9.7
flinkbot commented on PR #21680: URL: https://github.com/apache/flink/pull/21680#issuecomment-1383431236 ## CI report: * a78b7a8e95f7a0ad8148c14dd94dd495b8229f0a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30130) Table.select lose watermark
[ https://issues.apache.org/jira/browse/FLINK-30130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677130#comment-17677130 ] Jark Wu commented on FLINK-30130: - Sorry [~yunfengzhou] [~kamalesh0420], I don't think this is a problem or bug. This is by design that watermark **definition** can't be propagated as the referenced columns might not be selected in the current view/table. But the printed schema shows the "ROWTIME" attribute, which users can use to do windowing. You can also use "TUMBLE_ROWTIME" to generate a new rowtime attribute (and watermark implicitly) for the next rowtime-based operations. Therefore, from the perspective of users, there is enough information in the printed schema, and this doesn't affect users using rowtime-based operations. Please reopen the issue if you still have questions. > Table.select lose watermark > --- > > Key: FLINK-30130 > URL: https://issues.apache.org/jira/browse/FLINK-30130 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Yunfeng Zhou >Priority: Major > > Trying to execute the following program > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > DataStream stream = env.fromSequence(0, 1000); > Schema schema = Schema.newBuilder() > .column("f0", DataTypes.BIGINT()) > .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(f0 * 1000, 3)") > .watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND") > .build(); > Table table = tEnv.fromDataStream(stream, schema); > table.printSchema(); > table = table.select($("*")); > table.printSchema();{code} > Would get the following result > {code:java} > ( > `f0` BIGINT, > `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* AS TO_TIMESTAMP_LTZ(f0 * 1000, 3), > WATERMARK FOR `time_ltz`: TIMESTAMP_LTZ(3) AS time_ltz - INTERVAL '5' SECOND > ) > ( > `f0` BIGINT, > `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* > ) > {code} > This result shows that the watermark property of a Table is lost during > select operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30130) Table.select lose watermark
[ https://issues.apache.org/jira/browse/FLINK-30130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-30130. --- Resolution: Won't Fix > Table.select lose watermark > --- > > Key: FLINK-30130 > URL: https://issues.apache.org/jira/browse/FLINK-30130 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Yunfeng Zhou >Priority: Major > > Trying to execute the following program > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > DataStream stream = env.fromSequence(0, 1000); > Schema schema = Schema.newBuilder() > .column("f0", DataTypes.BIGINT()) > .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(f0 * 1000, 3)") > .watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND") > .build(); > Table table = tEnv.fromDataStream(stream, schema); > table.printSchema(); > table = table.select($("*")); > table.printSchema();{code} > Would get the following result > {code:java} > ( > `f0` BIGINT, > `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* AS TO_TIMESTAMP_LTZ(f0 * 1000, 3), > WATERMARK FOR `time_ltz`: TIMESTAMP_LTZ(3) AS time_ltz - INTERVAL '5' SECOND > ) > ( > `f0` BIGINT, > `time_ltz` TIMESTAMP_LTZ(3) *ROWTIME* > ) > {code} > This result shows that the watermark property of a Table is lost during > select operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30191) Update py4j from 0.10.9.3 to 0.10.9.7
[ https://issues.apache.org/jira/browse/FLINK-30191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30191: --- Labels: pull-request-available (was: ) > Update py4j from 0.10.9.3 to 0.10.9.7 > - > > Key: FLINK-30191 > URL: https://issues.apache.org/jira/browse/FLINK-30191 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #21679: [FLINK-30561][state/changelog] fix changelog local cache file not found
flinkbot commented on PR #21679: URL: https://github.com/apache/flink/pull/21679#issuecomment-1383427385 ## CI report: * 7cd37c90646b3713f4e734741b9074223c762932 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub opened a new pull request, #21680: [FLINK-30191][python] Update net.sf.py4j:py4j dependency to 0.10.9.7
yunfengzhou-hub opened a new pull request, #21680: URL: https://github.com/apache/flink/pull/21680 ## What is the purpose of the change - Updating net.sf.py4j:py4j from 0.10.9.3 to 0.10.9.7 ## Brief change log - Updated POM, Python dev requirements & setup, NOTICE file ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a diff in pull request #21620: [FLINK-30473][network] Optimize the InputGate network memory management for TaskManager
xintongsong commented on code in PR #21620: URL: https://github.com/apache/flink/pull/21620#discussion_r1070766952 ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -225,6 +225,35 @@ public class NettyShuffleEnvironmentOptions { + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" + " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster."); +/** + * Maximum number of network buffers to use for each outgoing/incoming gate (result + * partition/input gate), which contains all exclusive network buffers for all subpartitions and + * all floating buffers for the gate. The exclusive network buffers for one channel is + * configured by {@link #NETWORK_BUFFERS_PER_CHANNEL} and the floating buffers for one gate is + * configured by {@link #NETWORK_EXTRA_BUFFERS_PER_GATE}. + */ +@Experimental +@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) +public static final ConfigOption NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX = Review Comment: ```suggestion public static final ConfigOption NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX = ``` ```suggestion public static final ConfigOption NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE = ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/GateBuffersSpec.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.shuffle.NettyShuffleUtils; + +import java.util.Optional; + +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.adjustExclusiveBuffersPerChannel; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveExclusiveBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getEffectiveMaxRequiredBuffersPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMaxBuffersTargetPerGate; +import static org.apache.flink.runtime.shuffle.NettyShuffleUtils.getMinBuffersTargetPerGate; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Calculate the buffers specs(the exclusive buffers per channel, min/max optional buffers per gate) + * based on the configurations, result partition type, and the number of channels. + * + * The threshold is configured by {@link + * NettyShuffleEnvironmentOptions#NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE_MAX}. If the option is + * not configured, the threshold for Batch jobs is {@link + * NettyShuffleUtils#DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH} and the threshold for + * Streaming jobs is {#link NettyShuffleUtils#DEFAULT_MAX_BUFFERS_PER_GATE_FOR_STREAMING}. + */ +public class GateBuffersSpec { + +private final int effectiveExclusiveBuffersPerChannel; + +private final int minOptionalBuffers; + +private final int maxOptionalBuffers; + +private final int maxEffectiveTotalBuffersPerGate; + +private GateBuffersSpec( +int effectiveExclusiveBuffersPerChannel, +int minOptionalBuffers, +int maxOptionalBuffers, +int maxEffectiveTotalBuffersPerGate) { +this.effectiveExclusiveBuffersPerChannel = effectiveExclusiveBuffersPerChannel; +this.minOptionalBuffers = minOptionalBuffers; +this.maxOptionalBuffers = maxOptionalBuffers; +this.maxEffectiveTotalBuffersPerGate = maxEffectiveTotalBuffersPerGate; +} + +int minOptionalBuffers() { +return minOptionalBuffers; +} + +int maxOptionalBuffers() { +return maxOptionalBuffers; +} + +int getEffectiveExclusiveBuffersPerChannel() { +return effectiveExclusiveBuffersPerChannel; +} + +public int
[jira] [Updated] (FLINK-30561) ChangelogStreamHandleReaderWithCache cause FileNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-30561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30561: --- Labels: pull-request-available (was: ) > ChangelogStreamHandleReaderWithCache cause FileNotFoundException > > > Key: FLINK-30561 > URL: https://issues.apache.org/jira/browse/FLINK-30561 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Feifan Wang >Priority: Major > Labels: pull-request-available > > When a job with state changelog enabled continues to restart, the following > exceptions may occur : > {code:java} > java.lang.RuntimeException: java.io.FileNotFoundException: > /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_192/dstl-cache-file/dstl6215344559415829831.tmp > (No such file or directory) > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94) > at > org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: > /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_192/dstl-cache-file/dstl6215344559415829831.tmp > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.init(FileInputStream.java:138) > at > org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158) > at > org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95) > at > org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85) > ... 21 more {code} > *Problem causes:* > # *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local > cache file. The reference count is incremented when the input stream is > opened from the cache file, and decremented by one when the input stream is > closed. So the input stream must
[GitHub] [flink] zoltar9264 opened a new pull request, #21679: [FLINK-30561][state/changelog] fix changelog local cache file not found
zoltar9264 opened a new pull request, #21679: URL: https://github.com/apache/flink/pull/21679 ## What is the purpose of the change Fix the changelog local cache file not found while restarting, described in [FLINK-30561](https://issues.apache.org/jira/browse/FLINK-30561). ## Brief change log *(for example:)* - prevent _StateChangelogHandleStreamHandleReader_ close current iterator twice. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-30694: --- Assignee: chenhaiyang (was: zhule) > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: chenhaiyang >Assignee: chenhaiyang >Priority: Major > Labels: pull-request-available > > The page url is[ > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] LadyForest commented on a diff in pull request #21658: [FLINK-30661][table]Introduce interfaces for Delete/Update
LadyForest commented on code in PR #21658: URL: https://github.com/apache/flink/pull/21658#discussion_r1070788300 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Interface for {@link DynamicTableSink}s that support delete existing data according to row-level + * changes. The table sink is responsible for telling planner how to produce the row changes, and + * consuming them to achieve rows delete purpose. + * + * The planner will call the method {@link #applyRowLevelDelete(RowLevelModificationScanContext)} + * to get the {@link RowLevelDeleteInfo} returned by sink, and rewrite the delete statement based on + * the gotten {@link RowLevelDeleteInfo} to produce rows to {@link DynamicTableSink}. + * + * Note: If the sink also implements {@link SupportsDeletePushDown}, the planner will always + * prefer {@link SupportsDeletePushDown}, and only the filters aren't available or {@link + * SupportsDeletePushDown#applyDeleteFilters(List)} returns false, this interface will be + * considered. Review Comment: This statement is slightly confusing because the sentence's subject is the **planner**. How could a planner consider an interface? So what about ```java The planner always prefers {@link SupportsDeletePushDown} over {@link SupportsRowLevelDelete} on condition that {@link SupportsDeletePushDown#applyDeleteFilters(List)} returns true. ``` From my perspective, this is relatively concise and clear. If you worry it is not detailed enough, we can add the following description at the end. ```java The sink needs to implement this interface iff {@link SupportsDeletePushDown#applyDeleteFilters(List)} returns false. ``` ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Interface for {@link DynamicTableSink}s that support delete existing data according to row-level + * changes. The table sink is responsible for telling planner how to produce the row changes, and + * consuming them to achieve rows delete purpose. + * + * The planner will call the method {@link #applyRowLevelDelete(RowLevelModificationScanContext)} + * to get the {@link RowLevelDeleteInfo}
[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing
vacaly commented on code in PR #192: URL: https://github.com/apache/flink-ml/pull/192#discussion_r1070782561 ## flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.recommendation.swing.Swing; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** Tests {@link Swing}. */ +public class SwingTest { +@Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); +private StreamExecutionEnvironment env; +private StreamTableEnvironment tEnv; Review Comment: `env, tenv and trainData` are global variables and accessed by several test functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30689) Use auto produce bytes schema in Pulsar sink
[ https://issues.apache.org/jira/browse/FLINK-30689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-30689: -- Assignee: Yufan Sheng > Use auto produce bytes schema in Pulsar sink > > > Key: FLINK-30689 > URL: https://issues.apache.org/jira/browse/FLINK-30689 > Project: Flink > Issue Type: Improvement > Components: Connectors / Pulsar >Affects Versions: pulsar-4.0.0 >Reporter: Yufan Sheng >Assignee: Yufan Sheng >Priority: Major > Fix For: pulsar-4.0.0 > > > Pulsar has a {{Schema.AUTO_PRODUCE_BYTES()}} for sending the byte array > messages to broker with extra schema validation. This would be better than > directly using {{Schema.BYTES}} which bypass the validation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122 ] tanjialiang edited comment on FLINK-30687 at 1/16/23 2:58 AM: -- I try to found out how to fix this, i found in org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression the filterArg must be >0, but ‘SELECT COUNT( * ) FILTER (WHERE cluster_id = 1) FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', and it work. !image-2023-01-16-10-54-04-673.png! So i want to ask why we can not filterArg=0? Did it some scenes we can not use? Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 0. was (Author: JIRAUSER279823): I try to found out how to fix this, i found in org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', and it work. !image-2023-01-16-10-54-04-673.png! So i want to ask why we can not filterArg=0? Did it some scenes we can not use? Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 0. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try > this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122 ] tanjialiang commented on FLINK-30687: - I try to found out how to fix this, i found in org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', and it work. !image-2023-01-16-10-54-04-673.png! So i want to ask why we can not filterArg=0? Did it some scenes we can not use? Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 0. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try > this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lsyldliu commented on pull request #21605: [FLINK-29722][hive] Supports native hive max function for hive dialect
lsyldliu commented on PR #21605: URL: https://github.com/apache/flink/pull/21605#issuecomment-1383400146 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30687: Attachment: image-2023-01-16-10-54-04-673.png > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try > this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lsyldliu commented on a diff in pull request #21629: [FLINK-30579][hive] Introducing cofigurable option to enable hive native function
lsyldliu commented on code in PR #21629: URL: https://github.com/apache/flink/pull/21629#discussion_r1070778492 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java: ## @@ -310,7 +311,11 @@ public TOpenSessionResp OpenSession(TOpenSessionReq tOpenSessionReq) throws TExc // all the alive PersistenceManager in the ObjectStore, which may get error like // "Persistence Manager has been closed" in the later connection. hiveCatalog.open(); -Module hiveModule = new HiveModule(); +// create hive module lazily +FunctionCreator hiveModuleCreator = +(options, readableConfig, classLoader) -> +FactoryUtil.createModule( Review Comment: It uses the classloader in Context, and so is user classloader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677119#comment-17677119 ] chenhaiyang commented on FLINK-30694: - Hi [~jark] ,Can you assign this issue to me? I'd like to working on this ticket. Thank you very much! > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: chenhaiyang >Assignee: zhule >Priority: Major > Labels: pull-request-available > > The page url is[ > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenhaiyang updated FLINK-30694: Description: The page url is[ [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] The markdown file is located in docs/content.zh/docs/dev/table/sql/queries/window-tvf.md was: The page url is[ https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] The markdown file is located in docs/content.zh/docs/dev/table/sql/queries/window-tvf.md > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: chenhaiyang >Assignee: zhule >Priority: Major > Labels: pull-request-available > > The page url is[ > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenhaiyang updated FLINK-30694: Fix Version/s: (was: 1.11.0) Description: The page url is[ https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] The markdown file is located in docs/content.zh/docs/dev/table/sql/queries/window-tvf.md was: The page url is https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/functions/ The markdown file is located in {{flink/docs/dev/table/functions/index.zh.md}} Summary: Translate "Windowing TVF" page of "Querys" into Chinese (was: Translate "Joins" page of "Querys" into Chinese ) > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: chenhaiyang >Assignee: zhule >Priority: Major > Labels: pull-request-available > > The page url is[ > https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30694) Translate "Joins" page of "Querys" into Chinese
chenhaiyang created FLINK-30694: --- Summary: Translate "Joins" page of "Querys" into Chinese Key: FLINK-30694 URL: https://issues.apache.org/jira/browse/FLINK-30694 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: chenhaiyang Assignee: zhule Fix For: 1.11.0 The page url is https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/functions/ The markdown file is located in {{flink/docs/dev/table/functions/index.zh.md}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30332) HsFileDataIndex should supports spilling to file
[ https://issues.apache.org/jira/browse/FLINK-30332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-30332. Fix Version/s: 1.17.0 Resolution: Done master (1.17): 8e50e24797fbb154cdf7c484775f0fafa94cf34c > HsFileDataIndex should supports spilling to file > > > Key: FLINK-30332 > URL: https://issues.apache.org/jira/browse/FLINK-30332 > Project: Flink > Issue Type: Sub-task >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > `HsFileDataIndex` is maintains in a list of treeMap(i.e. in memory), there is > no problem in our 10T TPC-DS test. However, with the increase of job's data > volume, the index will continue to grow, and there is a risk of OOM. We > should support index data spilling and loading to avoid this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong closed pull request #21603: [FLINK-30332] HsFileDataIndex supports caching and spilling index entry
xintongsong closed pull request #21603: [FLINK-30332] HsFileDataIndex supports caching and spilling index entry URL: https://github.com/apache/flink/pull/21603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1383361255 @gaoyunhaii @dawidwys Could you help to take a review ? I'd like to make it usable in 1.17. Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30613) Improve resolving schema compatibility -- Milestone one
[ https://issues.apache.org/jira/browse/FLINK-30613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30613: --- Labels: pull-request-available (was: ) > Improve resolving schema compatibility -- Milestone one > --- > > Key: FLINK-30613 > URL: https://issues.apache.org/jira/browse/FLINK-30613 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > In the milestone one, we should: > # Add an extra method > (TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot > oldSerializerSnapshot)) in TypeSerializerSnapshot.java as above, and return > INCOMPATIBLE as default. > # Mark the original method as deprecated and it will use new method to > resolve as default. > # Implement the new method for all built-in TypeserializerSnapshots. > See FLIP-263 for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] masteryhx commented on pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1383357691 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myracle commented on pull request #21661: [FLINK-30629][Client/Job Submission] Fix the unstable test ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
Myracle commented on PR #21661: URL: https://github.com/apache/flink/pull/21661#issuecomment-1383357574 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 merged pull request #200: [FLINK-30578][build] Publish SBOM artifacts
lindong28 merged PR #200: URL: https://github.com/apache/flink-ml/pull/200 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30168) PyFlink Deserialization Error with Object Array
[ https://issues.apache.org/jira/browse/FLINK-30168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-30168. --- Fix Version/s: 1.17.0 1.16.1 1.15.4 Assignee: Dian Fu (was: Xingbo Huang) Resolution: Fixed Fixed in: - master via 46757739cf50c1e7b7305a4bc9cf779bb1945a1f - release-1.16 via 46c91ed4bc22e2de3a662d52b11ade8ed64dba0b - release-1.15 via cdecc21cad9f78b1555a0e2f5d7f1398949e7193 > PyFlink Deserialization Error with Object Array > --- > > Key: FLINK-30168 > URL: https://issues.apache.org/jira/browse/FLINK-30168 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.15.2 >Reporter: Yunfeng Zhou >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.16.1, 1.15.4 > > > When it is attempted to collect object array records from a DataStream in > PyFlink, an exception like follows would be thrown > {code:java} > data = 0, field_type = DenseVectorTypeInfo > def pickled_bytes_to_python_converter(data, field_type):if > isinstance(field_type, RowTypeInfo): > row_kind = RowKind(int.from_bytes(data[0], 'little')) > data = zip(list(data[1:]), field_type.get_field_types()) > fields = []for d, d_type in data: > fields.append(pickled_bytes_to_python_converter(d, d_type)) > row = Row.of_kind(row_kind, *fields)return rowelse: > > data = pickle.loads(data) > E TypeError: a bytes-like object is required, not 'int'{code} > I found that this error is invoked because PyFlink deals with object arrays > differently on Java side and Python side. > > On Java side > (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject) > {code:java} > ... > else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof > PrimitiveArrayTypeInfo) { > # recursively deal with array elements > } ... > else { > # ObjectArrayTypeInfo is here > TypeSerializer serializer = dataType.createSerializer(null); > ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); > DataOutputViewStreamWrapper baosWrapper = new > DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); > return pickler.dumps(baos.toByteArray()); > } > {code} > > On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter) > {code:java} > ... > elif isinstance(field_type, > (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)): > element_type = field_type._element_type > elements = [] > for element_bytes in data: > elements.append(pickled_bytes_to_python_converter(element_bytes, > element_type)) > return elements{code} > > Thus a possible fix for this bug is to align PyFlink's behavior on Java side > and Python side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] lindong28 commented on pull request #200: [FLINK-30578][build] Publish SBOM artifacts
lindong28 commented on PR #200: URL: https://github.com/apache/flink-ml/pull/200#issuecomment-1383342110 Thanks for the PR. LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu closed pull request #21664: [FLINK-30168][python] Fix DataStream.execute_and_collect to support None data and ObjectArray
dianfu closed pull request #21664: [FLINK-30168][python] Fix DataStream.execute_and_collect to support None data and ObjectArray URL: https://github.com/apache/flink/pull/21664 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29231) PyFlink UDAF produces different results in the same sliding window
[ https://issues.apache.org/jira/browse/FLINK-29231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-29231. --- Fix Version/s: 1.17.0 1.16.1 1.15.4 Resolution: Fixed Fixed in: - master via addca4e18aa1806984070b8c450d3d0df5e473d3 - release-1.16 via 6d3d66ca71a87c50f9d160b5c05aa6cbe49ae1b0 - release-1.15 via d9115d8c2aae75c0b17471c1ab8a0769e606def1 > PyFlink UDAF produces different results in the same sliding window > -- > > Key: FLINK-29231 > URL: https://issues.apache.org/jira/browse/FLINK-29231 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.15.3 >Reporter: Xuannan Su >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.16.1, 1.15.4 > > Attachments: image-2022-09-08-17-20-06-296.png, input, test_agg.py > > > It seems that PyFlink udtaf produces different results in the same sliding > window. It can be reproduced with the given code and input. It is not always > happening but the possibility is relatively high. > The incorrect output is the following: > !image-2022-09-08-17-20-06-296.png! > > We can see that the output contains different `val_sum` at `window_time` > 2022-01-01 00:01:59.999. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dianfu closed pull request #21655: [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window
dianfu closed pull request #21655: [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window URL: https://github.com/apache/flink/pull/21655 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced
[ https://issues.apache.org/jira/browse/FLINK-30679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677107#comment-17677107 ] hehuiyuan commented on FLINK-30679: --- [~luoyuxia] ,hi Take a look when you have time > Can not load the data of hive dim table when project-push-down is introduced > > > Key: FLINK-30679 > URL: https://issues.apache.org/jira/browse/FLINK-30679 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.14.6 >Reporter: hehuiyuan >Priority: Critical > Labels: pull-request-available > > > Can not load the data of hive dim table when project-push-down is introduced. > The project push down optimize:[https://github.com/apache/flink/pull/21311] > hive-exec version: 2.3.4 > flink version: 1.14.6 > flink-hive-connector: the latest code for release-1.14 branch > > vectorize read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} > > > mapreduce read: > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: 3 > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > ~[?:1.8.0_301] > at > java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) > ~[?:1.8.0_301] > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_301] > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) > ~[?:1.8.0_301] > at > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > ~[?:1.8.0_301] > at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) > ~[?:1.8.0_301] > at > org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) > ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at LookupFunction$26.flatMap(Unknown Source) ~[?:?] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] > at > org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) > ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} > > > The sql : > > {code:java} > CREATE TABLE kafkaTableSource ( > name string, > age int, > sex string, > address string, > ptime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'hehuiyuan1', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.client.id' = 'test-consumer-group', > 'properties.group.id' = 'test-consumer-group', > 'format' = 'csv' > ); > CREATE TABLE printsink ( > name string, > age int, > sex string, > address