[GitHub] [flink] huwh commented on pull request #22861: [FLINK-32387][runtime] InputGateDeploymentDescriptor uses cache to avoid deserializing shuffle descriptors multiple times
huwh commented on PR #22861: URL: https://github.com/apache/flink/pull/22861#issuecomment-1631882530 @wanglijie95, Thanks for the review. I made the changes as you suggested. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #22744: [FLINK-29802][state] Changelog supports native savepoint
masteryhx commented on code in PR #22744: URL: https://github.com/apache/flink/pull/22744#discussion_r1260611912 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -375,6 +378,49 @@ public RunnableFuture> snapshot( @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { + +if (checkpointOptions.getCheckpointType().isSavepoint()) { +SnapshotType.SharingFilesStrategy sharingFilesStrategy = + checkpointOptions.getCheckpointType().getSharingFilesStrategy(); +if (sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING) { +// For NO_SHARING native savepoint, trigger delegated one +RunnableFuture> delegatedSnapshotResult = +keyedStateBackend.snapshot( +checkpointId, timestamp, streamFactory, checkpointOptions); Review Comment: Thanks for the reply. Maybe I missed something. Currently, `materializationId` is a dummy one while triggering native savepoint. Do you mean that we should use current real `materializationId` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #22801: [FLINK-23484][benchmark] Supports base benchmark for ChangelogStateBackend
masteryhx commented on code in PR #22801: URL: https://github.com/apache/flink/pull/22801#discussion_r1260563718 ## flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/RescalingBenchmark.java: ## @@ -16,7 +16,7 @@ * limitations under the License */ -package org.apache.flink.contrib.streaming.state.benchmark; +package org.apache.flink.state.benchmark; Review Comment: Sure. PR Link: https://github.com/apache/flink-benchmarks/pull/75 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-benchmarks] masteryhx commented on a diff in pull request #75: [FLINK-23484] Supports to test ChangelogStateBackend
masteryhx commented on code in PR #75: URL: https://github.com/apache/flink-benchmarks/pull/75#discussion_r1260562329 ## src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java: ## @@ -57,7 +56,7 @@ public class StateBenchmarkBase extends BenchmarkBase { static AtomicInteger keyIndex; final ThreadLocalRandom random = ThreadLocalRandom.current(); -@Param({"HEAP", "ROCKSDB"}) +@Param({"HEAP", "ROCKSDB", "HEAP_CHANGELOG", "ROCKSDB_CHANGELOG"}) Review Comment: Thanks for the advice. I aggree that we should focus on testing the production env. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore
FangYongs commented on code in PR #22937: URL: https://github.com/apache/flink/pull/22937#discussion_r1260545206 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogStoreFactory.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Map; + +/** + * A factory to create configured catalog store instances based on string-based properties. See also + * {@link Factory} for more information. + * + * This factory is specifically designed for the Flink SQL gateway scenario, where different + * catalog stores need to be created for different sessions. + * + * If the CatalogStore is implemented using JDBC, this factory can be used to create a JDBC + * connection pool in the open method. This connection pool can then be reused for subsequent + * catalog store creations. + * + * The following examples implementation of CatalogStoreFactory using jdbc. + * + * {@code + * public class JdbcCatalogStore implements CatalogStore { + * + * private JdbcConnectionPool jdbcConnectionPool; + * public JdbcCatalogStore(JdbcConnectionPool jdbcConnectionPool) { + * this.jdbcConnectionPool = jdbcConnectionPool; + * } + * ... + * } + * + * public class JdbcCatalogStoreFactory implements CatalogStoreFactory { + * + * private JdbcConnectionPool jdbcConnectionPool; + * + * @Override + * public CatalogStore createCatalogStore(Context context) { + * return new JdbcCatalogStore(jdbcConnectionPool); + * } + * + * @Override + * public void open(Context context) throws CatalogException { + * // initialize the thread pool using options from context + * jdbcConnectionPool = initializeJdbcConnectionPool(context); + * } + * ... Review Comment: Add `close` for JdbcCatalogStoreFactory? I think the `close` is very special for jdbc catalog store factory -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore
FangYongs commented on code in PR #22937: URL: https://github.com/apache/flink/pull/22937#discussion_r1260543524 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/CatalogStoreFactoryTest.java: ## @@ -0,0 +1,9 @@ +package org.apache.flink.table.factories; + +import org.junit.jupiter.api.Test; + Review Comment: Add license and doc ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogStoreFactory.java: ## @@ -0,0 +1,87 @@ +package org.apache.flink.table.factories; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + Review Comment: Add license and doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32549) Tiered storage memory manager supports ownership transfer for buffers
[ https://issues.apache.org/jira/browse/FLINK-32549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuxin Tan closed FLINK-32549. - Fix Version/s: 1.18.0 Resolution: Fixed > Tiered storage memory manager supports ownership transfer for buffers > - > > Key: FLINK-32549 > URL: https://issues.apache.org/jira/browse/FLINK-32549 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Currently, the accumulator is responsible for requesting all buffers, leading > to an inaccurate number of requested buffers for each tier. > To address this issue, buffer ownership must be transferred from the > accumulator to the tiers when writing them, which will enable the memory > manager to maintain a correct number of requested buffers for different > owners. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] FangYongs commented on a diff in pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore
FangYongs commented on code in PR #22937: URL: https://github.com/apache/flink/pull/22937#discussion_r1260537362 ## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreTest.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link GenericInMemoryCatalogStore}. */ +public class GenericInMemoryCatalogStoreTest { + +@Test +void testStoreAndGet() { +CatalogStore catalogStore = new GenericInMemoryCatalogStore(); +catalogStore.open(); + +catalogStore.storeCatalog( +"catalog1", CatalogDescriptor.of("catalog1", new Configuration())); +assertThat(catalogStore.getCatalog("catalog1").isPresent()).isTrue(); +assertThat(catalogStore.contains("catalog1")).isTrue(); + +catalogStore.removeCatalog("catalog1", true); +assertThat(catalogStore.contains("catalog1")).isFalse(); + +catalogStore.close(); Review Comment: It's a little strange that `CatalogStore` can still be used after it is closed. Can we restrict this behavior? For example, `CatalogManager` should throw an exception when it access a closed `CatalogStore` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on PR #22966: URL: https://github.com/apache/flink/pull/22966#issuecomment-1631802657 Thanks for review @lsyldliu @swuferhong . All comments have been addressed or replied ,PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260529924 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java: ## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; +import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle; +import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Planner program that tries to inject runtime filter for suitable join to improve join + * performance. + * + * We build the runtime filter in a two-phase manner: First, each subtask on the build side + * builds a local filter based on its local data, and sends the built filter to a global aggregation + * node. Then the global aggregation node aggregates the received filters into a global filter, and + * sends the global filter to all probe side subtasks. Therefore, we will add {@link + * BatchPhysicalLocalRuntimeFilterBuilder}, {@link BatchPhysicalGlobalRuntimeFilterBuilder} and + * {@link BatchPhysicalRuntimeFilter} into the physical plan. + * + * For example, for the following query: + * + * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2} + * + * The original physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- TableSourceScan(table=[[fact]], fields=[a, b, c]) + *+- Exchange(distribution=[hash[x]]) + * +- Calc(select=[x, y], where=[=(z, 2)]) + * +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z]) + * } + * + * This optimized physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- RuntimeFilter(select=[a]) + *: :- Exchange(distribution=[broadcast]) + *:
[GitHub] [flink] xuzifu666 commented on a diff in pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend
xuzifu666 commented on code in PR #22980: URL: https://github.com/apache/flink/pull/22980#discussion_r1260521926 ## flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java: ## @@ -364,6 +364,7 @@ public Collection getAllHandles() throws Exception { return client.getChildren().forPath(path); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry +LOG.debug("Unable to get all handles, retrying (ZNode was likely deleted concurrently: {})", ignored.getMessage()); Review Comment: done,format had changed @rkhachatryan ## flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java: ## @@ -364,6 +364,7 @@ public Collection getAllHandles() throws Exception { return client.getChildren().forPath(path); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry +LOG.debug("Unable to get all handles, retrying (ZNode was likely deleted concurrently: {})", ignored.getMessage()); Review Comment: done,format had changed @rkhachatryan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22982: [FLINK-32576][network] ProducerMergedPartitionFileIndex supports caching regions and spilling regions to file when the cache is too large
flinkbot commented on PR #22982: URL: https://github.com/apache/flink/pull/22982#issuecomment-1631791507 ## CI report: * 96c12bd159b3cfe3f9785557badba75928e79803 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260518408 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java: ## @@ -165,6 +167,51 @@ public class OptimizerConfigOptions { "When it is true, the optimizer will try to push dynamic filtering into scan table source," + " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED = +key("table.optimizer.runtime-filter.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable the runtime filter. " ++ "When it is true, the optimizer will try to inject a runtime filter for eligible join."); + +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption +TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE = +key("table.optimizer.runtime-filter.max-build-data-size") +.memoryType() +.defaultValue(MemorySize.parse("10m")) Review Comment: The best vaule of default are still in testing. I will try to find a best default value based on the performance result of TPCDS-10T. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260517558 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java: ## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; +import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle; +import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Planner program that tries to inject runtime filter for suitable join to improve join + * performance. + * + * We build the runtime filter in a two-phase manner: First, each subtask on the build side + * builds a local filter based on its local data, and sends the built filter to a global aggregation + * node. Then the global aggregation node aggregates the received filters into a global filter, and + * sends the global filter to all probe side subtasks. Therefore, we will add {@link + * BatchPhysicalLocalRuntimeFilterBuilder}, {@link BatchPhysicalGlobalRuntimeFilterBuilder} and + * {@link BatchPhysicalRuntimeFilter} into the physical plan. + * + * For example, for the following query: + * + * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2} + * + * The original physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- TableSourceScan(table=[[fact]], fields=[a, b, c]) + *+- Exchange(distribution=[hash[x]]) + * +- Calc(select=[x, y], where=[=(z, 2)]) + * +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z]) + * } + * + * This optimized physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- RuntimeFilter(select=[a]) + *: :- Exchange(distribution=[broadcast]) + *:
[jira] [Updated] (FLINK-32576) ProducerMergedPartitionFileIndex supports spilling to file
[ https://issues.apache.org/jira/browse/FLINK-32576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32576: --- Labels: pull-request-available (was: ) > ProducerMergedPartitionFileIndex supports spilling to file > -- > > Key: FLINK-32576 > URL: https://issues.apache.org/jira/browse/FLINK-32576 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > > When running a very large-scale job, ProducerMergedPartitionFileIndex may > occupy too much heap memory and cause OOM. > To resolve the issue, ProducerMergedPartitionFileIndex should support > spilling to file to release the occupied memory when necessary. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22982: [FLINK-32576][network] ProducerMergedPartitionFileIndex supports caching regions and spilling regions to file when the cache is too lar
TanYuxin-tyx opened a new pull request, #22982: URL: https://github.com/apache/flink/pull/22982 ## What is the purpose of the change *ProducerMergedPartitionFileIndex supports caching regions and spilling regions to file when the cache is too large* ## Brief change log Use the `HsFileDataIndexSpilledRegionManager` and `HsFileDataIndexCache` to cache or spill the `ProducerMergedPartitionFileIndex`. - *[**commit 1**] Introduce a new parent region class HsBaseRegion* - *[**commit 2**] Use the HsBaseRegion to replace InternalRegion in HsFileDataIndexSpilledRegionManager* - *[**commit 3**] Rename the Segment in HsFileDataIndexSpilledRegionManagerImpl to RegionGroup* - *[**commit 4**] Introduce a new API HsRegionFileOperation. And ProducerMergedPartitionFileIndex supports caching regions and spilling regions to file when the cache is too large* ## Verifying this change This change will add unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260516947 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java: ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.planner.factories.TestValuesCatalog; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +/** Test for {@link FlinkRuntimeFilterProgram}. */ +public class FlinkRuntimeFilterProgramTest extends TableTestBase { +// 128L * 1024L * 48 = 6MB +private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L; +// 1024L * 1024L * 1024L * 60 = 60GB +private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L; + +private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); +private final TestValuesCatalog catalog = +new TestValuesCatalog("testCatalog", "test_database", true); + +@Before Review Comment: We have to use junit4 here, because the base class `TableTestBase` is based on junit4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260516252 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java: ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.planner.factories.TestValuesCatalog; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +/** Test for {@link FlinkRuntimeFilterProgram}. */ +public class FlinkRuntimeFilterProgramTest extends TableTestBase { +// 128L * 1024L * 48 = 6MB +private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L; +// 1024L * 1024L * 1024L * 60 = 60GB +private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L; + +private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); +private final TestValuesCatalog catalog = +new TestValuesCatalog("testCatalog", "test_database", true); + +@Before +public void setup() { +catalog.open(); +util.tableEnv().registerCatalog("testCatalog", catalog); +util.tableEnv().useCatalog("testCatalog"); +TableConfig tableConfig = util.tableEnv().getConfig(); + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, true); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, +MemorySize.parse("10m")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE, +MemorySize.parse("10g")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5); Review Comment: I explicitly set the runtime-filter related config options here, due to the default value may be changed later(The best default values are still in testing). We explicitly set them here, even if the default values are changed later, current test will not be affected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32549) Tiered storage memory manager supports ownership transfer for buffers
[ https://issues.apache.org/jira/browse/FLINK-32549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742244#comment-17742244 ] Weijie Guo commented on FLINK-32549: master(1.18) via 2dfff436c09821fb658bf8d289206b9ef85bb25b. > Tiered storage memory manager supports ownership transfer for buffers > - > > Key: FLINK-32549 > URL: https://issues.apache.org/jira/browse/FLINK-32549 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > > Currently, the accumulator is responsible for requesting all buffers, leading > to an inaccurate number of requested buffers for each tier. > To address this issue, buffer ownership must be transferred from the > accumulator to the tiers when writing them, which will enable the memory > manager to maintain a correct number of requested buffers for different > owners. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa merged pull request #22960: [FLINK-32549][network] Tiered storage memory manager supports ownership transfer for buffers
reswqa merged PR #22960: URL: https://github.com/apache/flink/pull/22960 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260510568 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java: ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.planner.factories.TestValuesCatalog; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +/** Test for {@link FlinkRuntimeFilterProgram}. */ +public class FlinkRuntimeFilterProgramTest extends TableTestBase { +// 128L * 1024L * 48 = 6MB +private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L; +// 1024L * 1024L * 1024L * 60 = 60GB +private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L; + +private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); +private final TestValuesCatalog catalog = +new TestValuesCatalog("testCatalog", "test_database", true); + +@Before +public void setup() { +catalog.open(); +util.tableEnv().registerCatalog("testCatalog", catalog); +util.tableEnv().useCatalog("testCatalog"); +TableConfig tableConfig = util.tableEnv().getConfig(); + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, true); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, +MemorySize.parse("10m")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE, +MemorySize.parse("10g")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5); +util.getTableEnv() +.getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L); Review Comment: Yes, you are right. The `TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD` and `TABLE_OPTIMIZER_AGG_PHASE_STRATEGY` are used to let the build side is a direct Agg or Calc (without Exchange). I move these 2 options to the test `testBuildSideIsAggWithoutExchange` and `testBuildSideIsCalcWithoutExchange` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260509411 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java: ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.planner.factories.TestValuesCatalog; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +/** Test for {@link FlinkRuntimeFilterProgram}. */ +public class FlinkRuntimeFilterProgramTest extends TableTestBase { +// 128L * 1024L * 48 = 6MB +private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L; +// 1024L * 1024L * 1024L * 60 = 60GB +private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L; + +private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); +private final TestValuesCatalog catalog = +new TestValuesCatalog("testCatalog", "test_database", true); + +@Before +public void setup() { +catalog.open(); +util.tableEnv().registerCatalog("testCatalog", catalog); +util.tableEnv().useCatalog("testCatalog"); +TableConfig tableConfig = util.tableEnv().getConfig(); + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, true); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, +MemorySize.parse("10m")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE, +MemorySize.parse("10g")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5); +util.getTableEnv() +.getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L); +util.getTableEnv() +.getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE"); + +// row avg size is 48 +String dimDdl = +"create table dim (\n" ++ " id BIGINT,\n" ++ " male BOOLEAN,\n" ++ " amount BIGINT,\n" ++ " price BIGINT,\n" ++ " dim_date_sk BIGINT\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'runtime-source' = 'NewSource',\n" ++ " 'bounded' = 'true'\n" ++ ")"; +util.tableEnv().executeSql(dimDdl); + +// row avg size is 60 +String factDdl = +"create table fact (\n" ++ " id BIGINT,\n" ++ " name STRING,\n" ++ " amount BIGINT,\n" ++ " price BIGINT,\n" ++ " fact_date_sk BIGINT\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'runtime-source' = 'NewSource',\n" ++ " 'bounded' = 'true'\n" ++ ")"; +util.tableEnv().executeSql(factDdl); +} + +@Test +public void testSimpleInnerJoin() throws Exception { +// runtime filter will succeed +setupSuitableTableStatistics(); +String query = "select *
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260508642 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java: ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.planner.factories.TestValuesCatalog; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +/** Test for {@link FlinkRuntimeFilterProgram}. */ +public class FlinkRuntimeFilterProgramTest extends TableTestBase { +// 128L * 1024L * 48 = 6MB +private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L; +// 1024L * 1024L * 1024L * 60 = 60GB +private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L; + +private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); +private final TestValuesCatalog catalog = +new TestValuesCatalog("testCatalog", "test_database", true); + +@Before +public void setup() { +catalog.open(); +util.tableEnv().registerCatalog("testCatalog", catalog); +util.tableEnv().useCatalog("testCatalog"); +TableConfig tableConfig = util.tableEnv().getConfig(); + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, true); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, +MemorySize.parse("10m")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE, +MemorySize.parse("10g")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5); +util.getTableEnv() +.getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L); +util.getTableEnv() +.getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE"); + +// row avg size is 48 +String dimDdl = +"create table dim (\n" ++ " id BIGINT,\n" ++ " male BOOLEAN,\n" ++ " amount BIGINT,\n" ++ " price BIGINT,\n" ++ " dim_date_sk BIGINT\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'runtime-source' = 'NewSource',\n" ++ " 'bounded' = 'true'\n" ++ ")"; +util.tableEnv().executeSql(dimDdl); + +// row avg size is 60 +String factDdl = +"create table fact (\n" ++ " id BIGINT,\n" ++ " name STRING,\n" ++ " amount BIGINT,\n" ++ " price BIGINT,\n" ++ " fact_date_sk BIGINT\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'runtime-source' = 'NewSource',\n" ++ " 'bounded' = 'true'\n" ++ ")"; +util.tableEnv().executeSql(factDdl); +} + +@Test +public void testSimpleInnerJoin() throws Exception { +// runtime filter will succeed +setupSuitableTableStatistics(); +String query = "select *
[jira] [Created] (FLINK-32581) Add document for atomic CTAS
tartarus created FLINK-32581: Summary: Add document for atomic CTAS Key: FLINK-32581 URL: https://issues.apache.org/jira/browse/FLINK-32581 Project: Flink Issue Type: Sub-task Reporter: tartarus Fix For: 1.18.0 add docs for atomic CTAS -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260504822 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java: ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.planner.factories.TestValuesCatalog; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +/** Test for {@link FlinkRuntimeFilterProgram}. */ +public class FlinkRuntimeFilterProgramTest extends TableTestBase { +// 128L * 1024L * 48 = 6MB +private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L; +// 1024L * 1024L * 1024L * 60 = 60GB +private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L; + +private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); +private final TestValuesCatalog catalog = +new TestValuesCatalog("testCatalog", "test_database", true); + +@Before +public void setup() { +catalog.open(); +util.tableEnv().registerCatalog("testCatalog", catalog); +util.tableEnv().useCatalog("testCatalog"); +TableConfig tableConfig = util.tableEnv().getConfig(); + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, true); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, +MemorySize.parse("10m")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE, +MemorySize.parse("10g")); +tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5); +util.getTableEnv() +.getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L); +util.getTableEnv() +.getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE"); + +// row avg size is 48 +String dimDdl = +"create table dim (\n" ++ " id BIGINT,\n" ++ " male BOOLEAN,\n" ++ " amount BIGINT,\n" ++ " price BIGINT,\n" ++ " dim_date_sk BIGINT\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'runtime-source' = 'NewSource',\n" ++ " 'bounded' = 'true'\n" ++ ")"; +util.tableEnv().executeSql(dimDdl); + +// row avg size is 60 +String factDdl = +"create table fact (\n" ++ " id BIGINT,\n" ++ " name STRING,\n" ++ " amount BIGINT,\n" ++ " price BIGINT,\n" ++ " fact_date_sk BIGINT\n" ++ ") with (\n" ++ " 'connector' = 'values',\n" ++ " 'runtime-source' = 'NewSource',\n" ++ " 'bounded' = 'true'\n" ++ ")"; +util.tableEnv().executeSql(factDdl); +} + +@Test +public void testSimpleInnerJoin() throws Exception { +// runtime filter will succeed +setupSuitableTableStatistics(); +String query = "select *
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260503747 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java: ## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; +import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle; +import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Planner program that tries to inject runtime filter for suitable join to improve join + * performance. + * + * We build the runtime filter in a two-phase manner: First, each subtask on the build side + * builds a local filter based on its local data, and sends the built filter to a global aggregation + * node. Then the global aggregation node aggregates the received filters into a global filter, and + * sends the global filter to all probe side subtasks. Therefore, we will add {@link + * BatchPhysicalLocalRuntimeFilterBuilder}, {@link BatchPhysicalGlobalRuntimeFilterBuilder} and + * {@link BatchPhysicalRuntimeFilter} into the physical plan. + * + * For example, for the following query: + * + * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2} + * + * The original physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- TableSourceScan(table=[[fact]], fields=[a, b, c]) + *+- Exchange(distribution=[hash[x]]) + * +- Calc(select=[x, y], where=[=(z, 2)]) + * +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z]) + * } + * + * This optimized physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- RuntimeFilter(select=[a]) + *: :- Exchange(distribution=[broadcast]) + *:
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260503527 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java: ## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; +import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle; +import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Planner program that tries to inject runtime filter for suitable join to improve join + * performance. + * + * We build the runtime filter in a two-phase manner: First, each subtask on the build side + * builds a local filter based on its local data, and sends the built filter to a global aggregation + * node. Then the global aggregation node aggregates the received filters into a global filter, and + * sends the global filter to all probe side subtasks. Therefore, we will add {@link + * BatchPhysicalLocalRuntimeFilterBuilder}, {@link BatchPhysicalGlobalRuntimeFilterBuilder} and + * {@link BatchPhysicalRuntimeFilter} into the physical plan. + * + * For example, for the following query: + * + * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2} + * + * The original physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- TableSourceScan(table=[[fact]], fields=[a, b, c]) + *+- Exchange(distribution=[hash[x]]) + * +- Calc(select=[x, y], where=[=(z, 2)]) + * +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z]) + * } + * + * This optimized physical plan: + * + * {@code + * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z]) + * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], build=[right]) + *:- Exchange(distribution=[hash[a]]) + *: +- RuntimeFilter(select=[a]) + *: :- Exchange(distribution=[broadcast]) + *:
[jira] [Updated] (FLINK-32349) Support atomic for CREATE TABLE AS SELECT(CTAS) statement
[ https://issues.apache.org/jira/browse/FLINK-32349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tartarus updated FLINK-32349: - Parent: FLINK-32580 Issue Type: Sub-task (was: New Feature) > Support atomic for CREATE TABLE AS SELECT(CTAS) statement > - > > Key: FLINK-32349 > URL: https://issues.apache.org/jira/browse/FLINK-32349 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: tartarus >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > For detailed information, see FLIP-305 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260501061 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java: ## @@ -165,6 +167,51 @@ public class OptimizerConfigOptions { "When it is true, the optimizer will try to push dynamic filtering into scan table source," + " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED = +key("table.optimizer.runtime-filter.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable the runtime filter. " ++ "When it is true, the optimizer will try to inject a runtime filter for eligible join."); + +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption +TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE = +key("table.optimizer.runtime-filter.max-build-data-size") +.memoryType() +.defaultValue(MemorySize.parse("10m")) +.withDescription( +"Data volume threshold of the runtime filter build side. " ++ "Estimated data volume needs to be under this value to try to inject runtime filter."); + +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption +TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE = +key("table.optimizer.runtime-filter.min-probe-data-size") +.memoryType() +.defaultValue(MemorySize.parse("10g")) +.withDescription( +Description.builder() +.text( +"Data volume threshold of the runtime filter probe side. " Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260500938 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java: ## @@ -165,6 +167,51 @@ public class OptimizerConfigOptions { "When it is true, the optimizer will try to push dynamic filtering into scan table source," + " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED = +key("table.optimizer.runtime-filter.enabled") +.booleanType() +.defaultValue(false) Review Comment: We hope to enable this feature by default in future releases. But in 1.18, it will be disabled by default. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java: ## @@ -165,6 +167,51 @@ public class OptimizerConfigOptions { "When it is true, the optimizer will try to push dynamic filtering into scan table source," + " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED = +key("table.optimizer.runtime-filter.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable the runtime filter. " ++ "When it is true, the optimizer will try to inject a runtime filter for eligible join."); + +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption +TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE = +key("table.optimizer.runtime-filter.max-build-data-size") +.memoryType() +.defaultValue(MemorySize.parse("10m")) +.withDescription( +"Data volume threshold of the runtime filter build side. " Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32580) FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS)
tartarus created FLINK-32580: Summary: FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) Key: FLINK-32580 URL: https://issues.apache.org/jira/browse/FLINK-32580 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: tartarus Fix For: 1.18.0 FLIP-305 Support atomic for CREATE TABLE AS SELECT(CTAS) statement -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260499124 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java: ## @@ -165,6 +167,51 @@ public class OptimizerConfigOptions { "When it is true, the optimizer will try to push dynamic filtering into scan table source," + " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED = +key("table.optimizer.runtime-filter.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable the runtime filter. " ++ "When it is true, the optimizer will try to inject a runtime filter for eligible join."); + +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) +public static final ConfigOption +TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE = +key("table.optimizer.runtime-filter.max-build-data-size") +.memoryType() +.defaultValue(MemorySize.parse("10m")) Review Comment: I 've added comments for the 3 related config options, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32157) Replaces LeaderConnectionInfo with LeaderInformation
[ https://issues.apache.org/jira/browse/FLINK-32157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742235#comment-17742235 ] Jiadong Lu commented on FLINK-32157: hi [~mapohl] , i would like to address this issue, would you mind assigning it to me ? > Replaces LeaderConnectionInfo with LeaderInformation > > > Key: FLINK-32157 > URL: https://issues.apache.org/jira/browse/FLINK-32157 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Matthias Pohl >Priority: Major > Labels: starter > > {{LeaderConnectionInfo}} and {{LeaderInformation}} have the same purpose. > {{LeaderInformation}} could substitute any occurrences of > {{LeaderConnectionInfo}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260498717 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalGlobalRuntimeFilterBuilder.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter.BatchExecGlobalRuntimeFilterBuilder; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel; +import org.apache.flink.table.planner.plan.optimize.program.FlinkRuntimeFilterProgram; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig; + +/** + * Batch physical RelNode responsible for aggregating all received filters into a global filter. See + * {@link FlinkRuntimeFilterProgram} for more info. + */ +public class BatchPhysicalGlobalRuntimeFilterBuilder extends SingleRel implements BatchPhysicalRel { Review Comment: For simplicity, I prefer to do it in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Tartarus0zm commented on pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement
Tartarus0zm commented on PR #22839: URL: https://github.com/apache/flink/pull/22839#issuecomment-1631766550 > Please don't forget to create a jira to add a doc for the atomic CTAS statement and put the jira and [FLINK-32349](https://issues.apache.org/jira/browse/FLINK-32349) under a umbrella jira. @luoyuxia thanks for your remind -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.
wanglijie95 commented on code in PR #22966: URL: https://github.com/apache/flink/pull/22966#discussion_r1260497685 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.List; + +/** Batch {@link ExecNode} for local runtime filter builder. */ +public class BatchExecLocalRuntimeFilterBuilder extends ExecNodeBase +implements BatchExecNode { +private final int[] buildIndices; +private final int estimatedRowCount; +private final int maxRowCount; + +public BatchExecLocalRuntimeFilterBuilder( +ReadableConfig tableConfig, +List inputProperties, +LogicalType outputType, +String description, +int[] buildIndices, +int estimatedRowCount, +int maxRowCount) { +super( +ExecNodeContext.newNodeId(), + ExecNodeContext.newContext(BatchExecLocalRuntimeFilterBuilder.class), +ExecNodeContext.newPersistedConfig( +BatchExecLocalRuntimeFilterBuilder.class, tableConfig), +inputProperties, +outputType, +description); +this.buildIndices = buildIndices; +this.estimatedRowCount = estimatedRowCount; +this.maxRowCount = maxRowCount; +} + +@Override +@SuppressWarnings("unchecked") +protected Transformation translateToPlanInternal( +PlannerBase planner, ExecNodeConfig config) { +ExecEdge inputEdge = getInputEdges().get(0); Review Comment: Fixed ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
[jira] [Created] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect
jasonliangyc created FLINK-32579: Summary: The filter criteria on the lookup table of Lookup join has no effect Key: FLINK-32579 URL: https://issues.apache.org/jira/browse/FLINK-32579 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.17.1, 1.17.0 Reporter: jasonliangyc Attachments: image-2023-07-12-09-31-18-261.png, image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png *1.* I joined two tables using the lookup join as below query in sql-client, the filter criteria of (p.name = '??') didn't shows up in the execution detail and it returned the rows only base on one condiction (cdc.product_id = p.id) {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and cdc.product_id = p.id ; {code} !image-2023-07-12-09-31-18-261.png|width=657,height=132! *2.* It showed the werid results when i changed the query as below, cause there were no data in the table(products) that the value of column 'name' is '??' and and execution detail didn't show us the where criteria. {code:java} select cdc.order_id, cdc.order_date, cdc.customer_name, cdc.price, p.name FROM orders AS cdc left JOIN products FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id where p.name = '??' ; {code} !image-2023-07-12-09-42-59-231.png|width=684,height=102! !image-2023-07-12-09-47-31-397.png|width=685,height=120! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xuzifu666 commented on pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend
xuzifu666 commented on PR #22980: URL: https://github.com/apache/flink/pull/22980#issuecomment-1631732406 @rkhachatryan had changed format style,thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement
luoyuxia commented on PR #22839: URL: https://github.com/apache/flink/pull/22839#issuecomment-1631729536 Please don't forget to create a jira to add a doc for the atomic CTAS statement and put the jira and FLINK-32349 under a umbrella jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28460) Flink SQL supports atomic CREATE TABLE AS SELECT(CTAS)
[ https://issues.apache.org/jira/browse/FLINK-28460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia closed FLINK-28460. Resolution: Fixed > Flink SQL supports atomic CREATE TABLE AS SELECT(CTAS) > -- > > Key: FLINK-28460 > URL: https://issues.apache.org/jira/browse/FLINK-28460 > Project: Flink > Issue Type: Sub-task >Reporter: tartarus >Assignee: tartarus >Priority: Major > > Enable support for atomic CTAS in stream and batch mode via option. > Active deletion of the created target table when the job fails or is > cancelled, but requires Catalog can be serialized directly via Java > Serialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32349) Support atomic for CREATE TABLE AS SELECT(CTAS) statement
[ https://issues.apache.org/jira/browse/FLINK-32349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-32349. -- Resolution: Fixed master: e6f77bea70682d1f2d708abee75a0dc33de16ee7 > Support atomic for CREATE TABLE AS SELECT(CTAS) statement > - > > Key: FLINK-32349 > URL: https://issues.apache.org/jira/browse/FLINK-32349 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: tartarus >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > For detailed information, see FLIP-305 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia merged pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement
luoyuxia merged PR #22839: URL: https://github.com/apache/flink/pull/22839 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever
[ https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-32578: Component/s: Table SQL / Runtime (was: Table SQL / Planner) > Cascaded group by window time columns on a proctime window aggregate may > result hang for ever > - > > Key: FLINK-32578 > URL: https://issues.apache.org/jira/browse/FLINK-32578 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.1 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Fix For: 1.18.0, 1.17.2 > > > Currently when group by window time columns on a proctime window aggregate > result will get a wrong plan which may result hang for ever in runtime. > For such a query: > {code} > insert into s1 > SELECT > window_start, > window_end, > sum(cnt), > count(*) > FROM ( > SELECT > a, > b, > window_start, > window_end, > count(*) as cnt, > sum(d) as sum_d, > max(d) as max_d > FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) > GROUP BY a, window_start, window_end, b > ) > GROUP BY a, window_start, window_end > {code} > the inner proctime window works fine, but the outer one doesn't work due to a > wrong plan which will translate to a unexpected event mode window operator: > {code} > Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS > c]) >+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) > AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) > +- Exchange(distribution=[hash[a]]) > +- Calc(select=[a, window_start, window_end, cnt]) > +- WindowAggregate(groupBy=[a, b], > window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS > cnt, start('w$) AS window_start, end('w$) AS window_end]) >+- Exchange(distribution=[hash[a, b]]) > +- Calc(select=[a, b, d, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, > default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742098#comment-17742098 ] Martijn Visser edited comment on FLINK-30998 at 7/11/23 11:11 PM: -- Fixed in: apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43 apache/flink-connector-opensearch:v1.0 a7f0ade240dbba04a41ae793684ede4285ca959b was (Author: martijnvisser): Fixed in: apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43 apache/flink-connector-opensearch:v1.0 TODO > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Assignee: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.0.2 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-30998. -- Resolution: Fixed > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Assignee: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.0.2 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] MartijnVisser merged pull request #30: [Backport] [1.0] [FLINK-30998] Apply adding failureHandler on top of current apache:main branch
MartijnVisser merged PR #30: URL: https://github.com/apache/flink-connector-opensearch/pull/30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric
rkhachatryan commented on code in PR #22772: URL: https://github.com/apache/flink/pull/22772#discussion_r1260137154 ## flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java: ## @@ -67,10 +70,12 @@ public class TaskIOMetricGroup extends ProxyMetricGroup { private final Meter mailboxThroughput; private final Histogram mailboxLatency; private final SizeGauge mailboxSize; +private final Gauge initializingTime; Review Comment: Thanks! I see in the latest update that the type of the field is still gauge and not counter. Is it intentional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22744: [FLINK-29802][state] Changelog supports native savepoint
rkhachatryan commented on code in PR #22744: URL: https://github.com/apache/flink/pull/22744#discussion_r1260099896 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -375,6 +378,49 @@ public RunnableFuture> snapshot( @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { + +if (checkpointOptions.getCheckpointType().isSavepoint()) { +SnapshotType.SharingFilesStrategy sharingFilesStrategy = + checkpointOptions.getCheckpointType().getSharingFilesStrategy(); +if (sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING) { +// For NO_SHARING native savepoint, trigger delegated one +RunnableFuture> delegatedSnapshotResult = +keyedStateBackend.snapshot( +checkpointId, timestamp, streamFactory, checkpointOptions); Review Comment: I think using `streamFactory` passed to this method is correct because savepoint should be stored in a separate path. I have another question: can't `checkpointId` interfere with `materializationId` (used in `initMaterialization`)? For example, in local recovery paths. ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -375,6 +378,49 @@ public RunnableFuture> snapshot( @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { + +if (checkpointOptions.getCheckpointType().isSavepoint()) { +SnapshotType.SharingFilesStrategy sharingFilesStrategy = + checkpointOptions.getCheckpointType().getSharingFilesStrategy(); +if (sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING) { +// For NO_SHARING native savepoint, trigger delegated one +RunnableFuture> delegatedSnapshotResult = +keyedStateBackend.snapshot( +checkpointId, timestamp, streamFactory, checkpointOptions); +return new FutureTask>( +() -> { +SnapshotResult result = + FutureUtils.runIfNotDoneAndGet(delegatedSnapshotResult); +return castSnapshotResult( +buildSnapshotResult( +checkpointId, +SnapshotResult.empty(), +new ChangelogSnapshotState( + getMaterializedResult(result; +}) { +@Override +public boolean cancel(boolean mayInterruptIfRunning) { +return delegatedSnapshotResult.cancel(mayInterruptIfRunning) +&& super.cancel(mayInterruptIfRunning); +} + +@Override +public boolean isCancelled() { +return delegatedSnapshotResult.isCancelled() && super.isCancelled(); +} + +@Override +public boolean isDone() { +return delegatedSnapshotResult.isDone() && super.isDone(); +} +}; +} else { +throw new UnsupportedOperationException( Review Comment: :+1: I think you're correct @masteryhx , these are orthogonal, and for savepoints we always use `NO_SHARING`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on pull request #30: [Backport] [1.0] [FLINK-30998] Apply adding failureHandler on top of current apache:main branch
reta commented on PR #30: URL: https://github.com/apache/flink-connector-opensearch/pull/30#issuecomment-1631239695 > > @MartijnVisser I think we could backport to 1.0, low risk change, thank you > > Fine with me, will you fix the CI? Yes! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] afedulov opened a new pull request, #629: [hotfix][docs] Add a 'Build from Source' page
afedulov opened a new pull request, #629: URL: https://github.com/apache/flink-kubernetes-operator/pull/629 ## What is the purpose of the change Adds a page with instructions and requirements for building from source similar to Flink's https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/ ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / **no**) - Core observer or reconciler logic that is regularly executed: (yes / **no**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32348) MongoDB tests are flaky and time out
[ https://issues.apache.org/jira/browse/FLINK-32348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742108#comment-17742108 ] Jiabao Sun commented on FLINK-32348: [~martijnvisser] Sorry for the late reply. The root cause of this error is not removing it from readersAwaitingSplit when closing an idle reader. This resulted in splits being incorrectly assigned to readers that did not complete when resuming tasks from checkpoints. 1. readersAwaitingSplit: [0] 2. signalNoMoreSplits but not remove 0 from readersAwaitingSplit 3. TaskManager failover 4. split request from reader 1 -> readersAwaitingSplit: [0, 1] 5. but actually assigns split to reader 0. The PR is ready, could you help review it? > MongoDB tests are flaky and time out > > > Key: FLINK-32348 > URL: https://issues.apache.org/jira/browse/FLINK-32348 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Martijn Visser >Priority: Critical > Labels: pull-request-available, test-stability > > https://github.com/apache/flink-connector-mongodb/actions/runs/5232649632/jobs/9447519651#step:13:39307 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32348) MongoDB tests are flaky and time out
[ https://issues.apache.org/jira/browse/FLINK-32348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32348: --- Labels: pull-request-available test-stability (was: test-stability) > MongoDB tests are flaky and time out > > > Key: FLINK-32348 > URL: https://issues.apache.org/jira/browse/FLINK-32348 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Martijn Visser >Priority: Critical > Labels: pull-request-available, test-stability > > https://github.com/apache/flink-connector-mongodb/actions/runs/5232649632/jobs/9447519651#step:13:39307 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend
rkhachatryan commented on code in PR #22980: URL: https://github.com/apache/flink/pull/22980#discussion_r1260009895 ## flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java: ## @@ -364,6 +364,7 @@ public Collection getAllHandles() throws Exception { return client.getChildren().forPath(path); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry +LOG.debug("Unable to get all handles, retrying (ZNode was likely deleted concurrently: {})", ignored.getMessage()); Review Comment: I think formatting is broken: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51183=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=2892 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32563) Allow connectors CI to specify the main supported Flink version
[ https://issues.apache.org/jira/browse/FLINK-32563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742101#comment-17742101 ] Etienne Chauchot commented on FLINK-32563: -- [~martijnvisser] I had in mind to be slightly more coercive with connector authors so that they run CI test on last 2 versions and specify which of the 2 is main supported one (for running archunit but also other things). I was thinking of something like this (in _testing.yml style): {code:java} jobs: compile_and_test: strategy: matrix: include: - flink: 1.16.2 main_version: false - flink: 1.17.1 main_version: true uses: ./.github/workflows/ci.yml with: connector_branch: ci_utils flink_version: ${{ matrix.flink }} main_flink_version: ${{ matrix.main_version }} {code} {code:java} inputs: main_flink_version: description: "Is the input Flink version, the main version that the connector supports." required: false // to avoid break the existing connectors type: boolean default: false {code} Do you prefer something like this ? {code:java} jobs: enable-archunit-tests: uses: ./.github/workflows/ci.yml with: flink_version: 1.17.1 connector_branch: ci_utils run_archunit_tests: true {code} {code:java} inputs: run_archunit_tests: description: "Whether to run the archunit tests" required: false // to avoid break the existing connectors type: boolean default: false {code} > Allow connectors CI to specify the main supported Flink version > --- > > Key: FLINK-32563 > URL: https://issues.apache.org/jira/browse/FLINK-32563 > Project: Flink > Issue Type: Technical Debt > Components: Build System / CI >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > > As part of [this > discussion|https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3] > , the need for connectors to specify the main flink version that a connector > supports has arisen. > This CI variable will allow to configure the build and tests differently > depending on this version. This parameter would be optional. > The first use case is to run archunit tests only on the main supported > version as discussed in the above thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742098#comment-17742098 ] Martijn Visser edited comment on FLINK-30998 at 7/11/23 4:48 PM: - Fixed in: apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43 apache/flink-connector-opensearch:v1.0 TODO was (Author: martijnvisser): Fixed in: apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43 apache/flink-connector-opensearch:v3.0 TODO > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Assignee: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.0.2 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742098#comment-17742098 ] Martijn Visser edited comment on FLINK-30998 at 7/11/23 4:48 PM: - Fixed in: apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43 apache/flink-connector-opensearch:v3.0 TODO was (Author: martijnvisser): Fixed in: apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43 > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Assignee: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.0.2 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30998: --- Fix Version/s: opensearch-1.0.2 (was: opensearch-1.1.0) > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Assignee: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.0.2 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reopened FLINK-30998: > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Assignee: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.1.0 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] MartijnVisser commented on pull request #30: [Backport] [1.0] [FLINK-30998] Apply adding failureHandler on top of current apache:main branch
MartijnVisser commented on PR #30: URL: https://github.com/apache/flink-connector-opensearch/pull/30#issuecomment-1631158945 > @MartijnVisser I think we could backport to 1.0, low risk change, thank you Fine with me, will you fix the CI? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] boring-cyborg[bot] commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch
boring-cyborg[bot] commented on PR #11: URL: https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1631144197 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-30998. -- Fix Version/s: opensearch-1.1.0 Resolution: Fixed Fixed in: apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43 > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.1.0 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
[ https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-30998: -- Assignee: Leonid Ilyevsky > Add optional exception handler to flink-connector-opensearch > > > Key: FLINK-30998 > URL: https://issues.apache.org/jira/browse/FLINK-30998 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: 1.16.1 >Reporter: Leonid Ilyevsky >Assignee: Leonid Ilyevsky >Priority: Major > Labels: pull-request-available > Fix For: opensearch-1.1.0 > > > Currently, when there is a failure coming from Opensearch, the > FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). > This makes the Flink pipeline fail. There is no way to handle the exception > in the client code. > I suggest to add an option to set a failure handler, similar to the way it is > done in elasticsearch connector. This way the client code has a chance to > examine the failure and handle it. > Here is the use case example when it will be very useful. We are using > streams on Opensearch side, and we are setting our own document IDs. > Sometimes these IDs are duplicated; we need to ignore this situation and > continue (this way it works for us with Elastisearch). > However, with opensearch connector, the error comes back, saying that the > batch failed (even though most of the documents were indexed, only the ones > with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32564) Support cast from BYTES to BIGINT
[ https://issues.apache.org/jira/browse/FLINK-32564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-32564: -- Assignee: Hanyu Zheng > Support cast from BYTES to BIGINT > - > > Key: FLINK-32564 > URL: https://issues.apache.org/jira/browse/FLINK-32564 > Project: Flink > Issue Type: Sub-task >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > We are dealing with a task that requires casting from the BYTES type to > BIGINT. Specifically, we have a string '00T1p'. Our approach is to convert > this string to BYTES and then cast the result to BIGINT with the following > SQL query: > {code:java} > SELECT CAST((CAST('00T1p' as BYTES)) as BIGINT);{code} > However, an issue arises when executing this query, likely due to an error in > the conversion between BYTES and BIGINT. We aim to identify and rectify this > issue so our query can run correctly. The tasks involved are: > # Investigate and identify the specific reason for the failure of conversion > from BYTES to BIGINT. > # Design and implement a solution to ensure our query can function correctly. > # Test this solution across all required scenarios to guarantee its > functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32565) Support cast from BYTES to DOUBLE
[ https://issues.apache.org/jira/browse/FLINK-32565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-32565: -- Assignee: Hanyu Zheng > Support cast from BYTES to DOUBLE > - > > Key: FLINK-32565 > URL: https://issues.apache.org/jira/browse/FLINK-32565 > Project: Flink > Issue Type: Sub-task >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > We are undertaking a task that requires casting from the BYTES type to > DOUBLE. In particular, we have a string '00T1p'. Our current approach is to > convert this string to BYTES and then cast the result to DOUBLE using the > following SQL query: > {code:java} > SELECT CAST((CAST('00T1p' as BYTES)) as DOUBLE);{code} > {{ }} > However, we encounter an issue when executing this query, potentially due to > an error in the conversion between BYTES and DOUBLE. Our goal is to identify > and correct this issue so that our query can execute successfully. The tasks > involved are: > # Investigate and pinpoint the specific reason for the conversion failure > from BYTES to DOUBLE. > # Design and implement a solution that enables our query to function > correctly. > # Test this solution across all required scenarios to ensure its robustness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint
[ https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742095#comment-17742095 ] Jing Ge commented on FLINK-30238: - [~ConradJam] would you like to describe the issue you got. We'd like to have the most up-to-date status of the issue. Thanks! > Unified Sink committer does not clean up state on final savepoint > - > > Key: FLINK-30238 > URL: https://issues.apache.org/jira/browse/FLINK-30238 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Fabian Paul >Priority: Critical > Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png > > > During stop-with-savepoint the committer only commits the pending > committables on notifyCheckpointComplete. > This has several downsides. > * Last committableSummary has checkpoint id LONG.MAX and is never cleared > from the state leading to that stop-with-savepoint does not work when the > pipeline recovers from a savepoint > * While the committables are committed during stop-with-savepoint they are > not forwarded to post-commit topology, potentially losing data and preventing > to close open transactions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32458) support mixed use of JSON_OBJECTAGG & JSON_ARRAYAGG with other aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-32458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-32458: --- Assignee: Yunhong Zheng > support mixed use of JSON_OBJECTAGG & JSON_ARRAYAGG with other aggregate > functions > -- > > Key: FLINK-32458 > URL: https://issues.apache.org/jira/browse/FLINK-32458 > Project: Flink > Issue Type: Sub-task >Reporter: lincoln lee >Assignee: Yunhong Zheng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever
lincoln lee created FLINK-32578: --- Summary: Cascaded group by window time columns on a proctime window aggregate may result hang for ever Key: FLINK-32578 URL: https://issues.apache.org/jira/browse/FLINK-32578 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.17.1 Reporter: lincoln lee Assignee: lincoln lee Fix For: 1.18.0, 1.17.2 Currently when group by window time columns on a proctime window aggregate result will get a wrong plan which may result hang for ever in runtime. For such a query: {code} insert into s1 SELECT window_start, window_end, sum(cnt), count(*) FROM ( SELECT a, b, window_start, window_end, count(*) as cnt, sum(d) as sum_d, max(d) as max_d FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) GROUP BY a, window_start, window_end, b ) GROUP BY a, window_start, window_end {code} the inner proctime window works fine, but the outer one doesn't work due to a wrong plan which will translate to a unexpected event mode window operator: {code} Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c]) +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS c]) +- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, window_start, window_end, cnt]) +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS cnt, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a, b]]) +- Calc(select=[a, b, d, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32501) Wrong execution plan of a proctime window aggregation generated due to incorrect cost evaluation
[ https://issues.apache.org/jira/browse/FLINK-32501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-32501: Fix Version/s: (was: 1.17.2) > Wrong execution plan of a proctime window aggregation generated due to > incorrect cost evaluation > > > Key: FLINK-32501 > URL: https://issues.apache.org/jira/browse/FLINK-32501 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.2, 1.17.1 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Currently when uses window aggregation referring a windowing tvf with a > filter condition, may encounter wrong plan which may hang forever in > runtime(the window aggregate operator never output) > for such a case: > {code} > insert into sink > select > window_start, > window_end, > b, > COALESCE(sum(case > when a = 11 > then 1 > end), 0) c > from > TABLE( > TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS) > ) > where > a in (1, 5, 7, 9, 11) > GROUP BY > window_start, window_end, b > {code} > generate wrong plan which didn't combine the proctime WindowTableFunction > into WindowAggregate (so when translate to execution plan the WindowAggregate > will wrongly recognize the window as an event-time window, then the > WindowAggregateOperator will not receive watermark nor setup timers to fire > any windows in runtime) > {code} > Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c]) >+- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) > AS window_start, end('w$) AS window_end]) > +- Exchange(distribution=[hash[b]]) > +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, > null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) > +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], > size=[10 s])]) >+- Calc(select=[a, b, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, > default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) > {code} > expected plan: > {code} > Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c]) >+- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], > size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, > end('w$) AS window_end]) > +- Exchange(distribution=[hash[b]]) > +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, > PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) > +- TableSourceScan(table=[[default_catalog, default_database, > source, project=[a, b], metadata=[]]], fields=[a, b]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22981: [FLINK-28743][table-planner] Supports validating the determinism for StreamPhysicalMatchRecognize
flinkbot commented on PR #22981: URL: https://github.com/apache/flink/pull/22981#issuecomment-1630953554 ## CI report: * 4a09b912e7aaa9acdc874b6232551c9b63fa188f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28743: --- Labels: pull-request-available (was: ) > Support validating the determinism for StreamPhysicalMatchRecognize > --- > > Key: FLINK-28743 > URL: https://issues.apache.org/jira/browse/FLINK-28743 > Project: Flink > Issue Type: Sub-task >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > > MatchRecognize has complex expressions and is not commonly used in > traditional SQLs, so mark this as a minor issue (for 1.16) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lincoln-lil opened a new pull request, #22981: [FLINK-28743][table-planner] Supports validating the determinism for StreamPhysicalMatchRecognize
lincoln-lil opened a new pull request, #22981: URL: https://github.com/apache/flink/pull/22981 ## What is the purpose of the change This is an minor addition to the FLINK-27849 implementation to support the analysis of the `MatchRecognize` node. Since the `MatchRecognize` will not support updating inputs in the foreseeable future, here we keep the analytic logic as a relatively simple one. ## Brief change log add the analytic logic of `MatchRecognize` node in `StreamNonDeterministicUpdatePlanVisitor` ## Verifying this change added test cases into `NonDeterministicDagTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-28743: --- Assignee: lincoln lee > Support validating the determinism for StreamPhysicalMatchRecognize > --- > > Key: FLINK-28743 > URL: https://issues.apache.org/jira/browse/FLINK-28743 > Project: Flink > Issue Type: Sub-task >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Minor > Fix For: 1.18.0 > > > MatchRecognize has complex expressions and is not commonly used in > traditional SQLs, so mark this as a minor issue (for 1.16) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #22932: [FLINK-26549][table][tests] Add tests for INSERT INTO with VALUES leads to wrong type inference with nested types
XComp commented on code in PR #22932: URL: https://github.com/apache/flink/pull/22932#discussion_r1259561126 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.java: ## @@ -166,6 +172,48 @@ void testCanonizeType() { .isNotEqualTo(typeFactory.builder().add("f0", genericRelType3).build()); } +static Stream testLeastRestrictive() { +return Stream.of( Review Comment: Could you add a comment to the code on why these different cases were selected for the test? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/InsertIntoValuesTest.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql; + +import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Test; + +/** Plan test for INSERT INTO. */ +public class InsertIntoValuesTest extends TableTestBase { +private final JavaStreamTableTestUtil util = javaStreamTestUtil(); + +@Test +public void testTypeInferenceWithNestedTypes() { +util.tableEnv() +.executeSql( +"CREATE TABLE t1 (" ++ " `mapWithStrings` MAP," ++ " `mapWithBytes` MAP" ++ ") WITH (" ++ " 'connector' = 'values'" ++ ")"); + +util.verifyExecPlanInsert( +"INSERT INTO t1 VALUES " ++ "(MAP['a', '123', 'b', '123456'], MAP['k1', X'C0FFEE', 'k2', X'BABE']), " ++ "(CAST(NULL AS MAP), CAST(NULL AS MAP)), " ++ "(MAP['a', '1', 'b', '1'], MAP['k1', X'10', 'k2', X'20'])"); Review Comment: Here it might be usefule to add a comment that Calcite 1.26 relied on the last value to derive the type. That way, readers of the code would understand why you decided on this specific test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-kafka] tzulitai commented on pull request #39: [FLINK-32453] Make main and v3.0 branch build against Flink 1.18-SNAPSHOT
tzulitai commented on PR #39: URL: https://github.com/apache/flink-connector-kafka/pull/39#issuecomment-1630892082 @XComp there's isn't a Kafka-connector specific release documentation, as far as I'm aware of. So far, the generic doc was good enough at least for the first `v3.0,0` release of the connector. Adding steps for generating the snapshot test files to the readme sounds good for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32560) Properly deprecate all Scala APIs
[ https://issues.apache.org/jira/browse/FLINK-32560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742015#comment-17742015 ] Ryan Skraba commented on FLINK-32560: - OK -- taking a quick look at the current work, I'll take a look at all modules outside of table-planner and add the following annotation and comment *every single time*: {code} /** * @deprecated * All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You * can still build your application in Scala, but you should move to the Java version of either * the DataStream and/or Table API. * @see * https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support;> * FLIP-265 Deprecate and remove Scala API support */ @Deprecated {code} There's also some {{@Public}} scala code in flink-hadoop-compatibility, and even if {{table-planner}} isn't in scope yet, the {{table-table-api-scala}} and {{table-table-api-scala-bridge}} are likely also candidates for clean-up. > Properly deprecate all Scala APIs > - > > Key: FLINK-32560 > URL: https://issues.apache.org/jira/browse/FLINK-32560 > Project: Flink > Issue Type: Sub-task > Components: API / Scala >Reporter: Xintong Song >Assignee: Ryan Skraba >Priority: Blocker > Fix For: 1.18.0 > > > We agreed to drop Scala API support in FLIP-265 [1], and have tried to > deprecate them in FLINK-29740. Also, both user documentation and roadmap[2] > shows that scala API supports are deprecated. However, none of the APIs in > `flink-streaming-scala` are annotated with `@Deprecated` atm, and only > `ExecutionEnvironment` and `package` are marked `@Deprecated` in > `flink-scala`. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support > [2] https://flink.apache.org/roadmap/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741999#comment-17741999 ] Tzu-Li (Gordon) Tai commented on FLINK-32455: - [~renqs] I'm merging my hotfix PR now which should unblock things for the time being. The PR has already been reviewed and approved by [~gaoyunhaii]. > Breaking change in TypeSerializerUpgradeTestBase prevents > flink-connector-kafka from building against 1.18-SNAPSHOT > --- > > Key: FLINK-32455 > URL: https://issues.apache.org/jira/browse/FLINK-32455 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Test Infrastructure >Affects Versions: 1.18.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.18.0 > > > FLINK-27518 introduced a breaking signature change to the abstract class > {{TypeSerializerUpgradeTestBase}}, specifically the abstract > {{createTestSpecifications}} method signature was changed. This breaks > downstream test code in externalized connector repos, e.g. > flink-connector-kafka's {{KafkaSerializerUpgradeTest}} > Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by > downstream test code that depends on flink-core test-jar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741997#comment-17741997 ] Tzu-Li (Gordon) Tai commented on FLINK-32455: - [~gaoyunhaii] that plan sounds good to me. Are you planning to steps 1. and 2. already for 1.18? If yes I can probably revert the "quick" fix PR I did in the Flink Kafka connector repo. > Breaking change in TypeSerializerUpgradeTestBase prevents > flink-connector-kafka from building against 1.18-SNAPSHOT > --- > > Key: FLINK-32455 > URL: https://issues.apache.org/jira/browse/FLINK-32455 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Test Infrastructure >Affects Versions: 1.18.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.18.0 > > > FLINK-27518 introduced a breaking signature change to the abstract class > {{TypeSerializerUpgradeTestBase}}, specifically the abstract > {{createTestSpecifications}} method signature was changed. This breaks > downstream test code in externalized connector repos, e.g. > flink-connector-kafka's {{KafkaSerializerUpgradeTest}} > Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by > downstream test code that depends on flink-core test-jar. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-kafka] tzulitai commented on pull request #39: [FLINK-32453] Make main and v3.0 branch build against Flink 1.18-SNAPSHOT
tzulitai commented on PR #39: URL: https://github.com/apache/flink-connector-kafka/pull/39#issuecomment-1630788865 re: @XComp > Would it make sense to create a new testbase class for connectors in flink-connector-common that implements the test data generation and mark it as @PublicEvolving? To make it more obvious that it's used in the external repositories? I was thinking along the same lines, although it should just be a public-facing test utility that Flink distributes. I can imagine any user implementing implementing their own `TypeSerializer` who might want to write migration tests for them using the base class. > Additionally, where is the release documentation kept for the Kafka connector. There's this: https://cwiki.apache.org/confluence/display/FLINK/Creating+a+flink-connector+release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741989#comment-17741989 ] Ahmed Hamdy commented on FLINK-27756: - [~Sergey Nuyanzin]thanks for reopening, I will try to take a look at it this week. > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.15.0, 1.17.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.16.0 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-kafka] tzulitai commented on pull request #7: [FLINK-31408] Add support for EOS delivery-guarantee in upsert-kafka
tzulitai commented on PR #7: URL: https://github.com/apache/flink-connector-kafka/pull/7#issuecomment-1630767966 @Ge as discussed offline, I think there really isn't a better way around the current PR approach. I'll address the comments on the doc and proceed to merge this. Thank you for your contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize
[ https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741983#comment-17741983 ] lincoln lee commented on FLINK-28743: - [~twalthr] Sorry for the late reply. Yes, I also confirmed with [~dianfu] offline that the MatchRecognize won't support updating inputs in the foreseeable future either, so the analytic support for this operator can be relatively simple, and I'm working on a pr that should be ready in time for the 1.18 release. > Support validating the determinism for StreamPhysicalMatchRecognize > --- > > Key: FLINK-28743 > URL: https://issues.apache.org/jira/browse/FLINK-28743 > Project: Flink > Issue Type: Sub-task >Reporter: lincoln lee >Priority: Minor > Fix For: 1.18.0 > > > MatchRecognize has complex expressions and is not commonly used in > traditional SQLs, so mark this as a minor issue (for 1.16) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32528) The RexCall a = a,if a's datatype is nullable, and when a is null, a = a is null, it isn't true in BinaryComparisonExprReducer
[ https://issues.apache.org/jira/browse/FLINK-32528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741974#comment-17741974 ] Yunhong Zheng commented on FLINK-32528: --- Hi, [~shenlang] , can you provide a specific sql query as example? I don't quite understand this 'if a's datatype is nullable, and when a is null, a = a is null, a <> a is null'. Thank you! > The RexCall a = a,if a's datatype is nullable, and when a is null, a = a is > null, it isn't true in BinaryComparisonExprReducer > -- > > Key: FLINK-32528 > URL: https://issues.apache.org/jira/browse/FLINK-32528 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: LakeShen >Priority: Major > > Now I'am reading flink sql planner's source code,when I saw the > FlinkRexUtil.java, in the > org.apache.flink.table.planner.plan.utils.FlinkRexUtil#simplify method,it > used the BinaryComparisonExprReducer the deal with BINARY_COMPARISON's > operator which the operands are RexInputRef and the operands are same, e.g. a > = a, a <> a,a >=a... > In BinaryComparisonExprReducer, a = a,a <=a,a>=a will be simplified the true > literal,a <> a,a < a, a > a will be simplified the false literal. > if a's datatype is nullable, and when a is null, a = a is null, a <> a is > null. In BinaryComparisonExprReducer's logic,It does not consider the case of > the nullable data type. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32560) Properly deprecate all Scala APIs
[ https://issues.apache.org/jira/browse/FLINK-32560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741973#comment-17741973 ] Xintong Song commented on FLINK-32560: -- The help is very much welcomed. Thanks [~rskraba] and [~Sergey Nuyanzin]. > Properly deprecate all Scala APIs > - > > Key: FLINK-32560 > URL: https://issues.apache.org/jira/browse/FLINK-32560 > Project: Flink > Issue Type: Sub-task > Components: API / Scala >Reporter: Xintong Song >Assignee: Ryan Skraba >Priority: Blocker > Fix For: 1.18.0 > > > We agreed to drop Scala API support in FLIP-265 [1], and have tried to > deprecate them in FLINK-29740. Also, both user documentation and roadmap[2] > shows that scala API supports are deprecated. However, none of the APIs in > `flink-streaming-scala` are annotated with `@Deprecated` atm, and only > `ExecutionEnvironment` and `package` are marked `@Deprecated` in > `flink-scala`. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support > [2] https://flink.apache.org/roadmap/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
zentol commented on code in PR #22850: URL: https://github.com/apache/flink/pull/22850#discussion_r1259659718 ## flink-connectors/flink-connector-datagen-test/pom.xml: ## @@ -0,0 +1,89 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.18-SNAPSHOT + + + flink-connector-datagen-tests Review Comment: > Would you like to see this refactoring integrated in order to minimize the dependencies surface regardless of that? If so, I'll open a separate ticket and a PR. Should be a separate ticket :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
afedulov commented on code in PR #22850: URL: https://github.com/apache/flink/pull/22850#discussion_r1259655490 ## flink-connectors/flink-connector-datagen-test/pom.xml: ## @@ -0,0 +1,89 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.18-SNAPSHOT + + + flink-connector-datagen-tests Review Comment: Small question: I already prepared the rework of `flink-architecture-tests` towards using the fully-qualified names instead of these somewhat redundant dependencies for class objects. If we go for option (3) we do not immediately need it here. Would you like to see this refactoring integrated in order to minimize the dependencies surface regardless of that? If so, I'll open a separate ticket and a PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
afedulov commented on code in PR #22850: URL: https://github.com/apache/flink/pull/22850#discussion_r1259655490 ## flink-connectors/flink-connector-datagen-test/pom.xml: ## @@ -0,0 +1,89 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.18-SNAPSHOT + + + flink-connector-datagen-tests Review Comment: Small question: I already prepared the rework of `flink-architecture-tests` towards using the fully-qualified names instead of these somewhat redundant dependencies for class objects. If we go for option (3) we do not immediately need it here. Would you like to see this refactoring integrated in order to minimize the dependencies surface regardless? If so, I'll open a separate ticket and a PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
afedulov commented on code in PR #22850: URL: https://github.com/apache/flink/pull/22850#discussion_r1259655490 ## flink-connectors/flink-connector-datagen-test/pom.xml: ## @@ -0,0 +1,89 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.18-SNAPSHOT + + + flink-connector-datagen-tests Review Comment: Small question: I already prepared the rework of `flink-architecture-tests` towards using the fully-qualified names instead of these somewhat redundant dependencies for class objects. If we go for option (3) we do not immediately need it here. Would you like to see this refactoring integrated in order to minimize the dependencies surface nonetheless? If so, I'll open a separate ticket and a PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse
WenDing-Y commented on PR #49: URL: https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1630724942 > > One question that comes to my mind now.. This will only work with a 1 server no? if we have a Clickhouse cluster with more than 1 server (that will have a zookeeper envolved) will not work no? > > if this is true, I think we should have a note on documentation > > I tried to work normally in cluster mode > > My steps are as follows 1.create ck table > > ``` > create table if not exists default.ck_flink_test_local on cluster clicks_cluster > ( > user_id Int32, > message String > ) > engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', '{replica}') > PRIMARY KEY (user_id) > > > > > create table if not exists default.ck_flink_test on cluster clicks_cluster > ( > user_id Int32, > message String > ) > engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', user_id); > ``` > > 2. in flink sql client > > ``` > > CREATE TABLE ck_flink_test( >user_id INTEGER, > message String > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:clickhouse://xx:xx/default', > 'table-name' = 'ck_flink_test', > 'username’='xx', > 'password’='xx' > ); > ``` > > 3. exec insert sql > > ``` > INSERT INTO ck_flink_test (user_id, message) VALUES > (101, 'Hello'); > ``` > > 4. thd data can be viewed on the ck client > > ``` > select * from ck_flink_test; > ``` > > 5.the flink sql client > > ![image](https://user-images.githubusercontent.com/19184146/252660687-89eca20f-eabf-4090-9460-c46716653afc.png) @eskabetxe I guess you might have defined the table incorrectly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] WenDing-Y closed pull request #49: [FLINK-32068] connector jdbc support clickhouse
WenDing-Y closed pull request #49: [FLINK-32068] connector jdbc support clickhouse URL: https://github.com/apache/flink-connector-jdbc/pull/49 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse
WenDing-Y commented on PR #49: URL: https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1630722843 > > One question that comes to my mind now.. This will only work with a 1 server no? if we have a Clickhouse cluster with more than 1 server (that will have a zookeeper envolved) will not work no? > > if this is true, I think we should have a note on documentation > > I tried to work normally in cluster mode > > My steps are as follows 1.create ck table > > ``` > create table if not exists default.ck_flink_test_local on cluster clicks_cluster > ( > user_id Int32, > message String > ) > engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', '{replica}') > PRIMARY KEY (user_id) > > > > > create table if not exists default.ck_flink_test on cluster clicks_cluster > ( > user_id Int32, > message String > ) > engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', user_id); > ``` > > 2. in flink sql client > > ``` > > CREATE TABLE ck_flink_test( >user_id INTEGER, > message String > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:clickhouse://xx:xx/default', > 'table-name' = 'ck_flink_test', > 'username’='xx', > 'password’='xx' > ); > ``` > > 3. exec insert sql > > ``` > INSERT INTO ck_flink_test (user_id, message) VALUES > (101, 'Hello'); > ``` > > 4. thd data can be viewed on the ck client > > ``` > select * from ck_flink_test; > ``` > > 5.the flink sql client > > ![image](https://user-images.githubusercontent.com/19184146/252660687-89eca20f-eabf-4090-9460-c46716653afc.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse
WenDing-Y commented on PR #49: URL: https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1630721467 > One question that comes to my mind now.. This will only work with a 1 server no? if we have a Clickhouse cluster with more than 1 server (that will have a zookeeper envolved) will not work no? > > if this is true, I think we should have a note on documentation I tried to work normally in cluster mode My steps are as follows 1.create ck table ``` create table if not exists default.ck_flink_test_local on cluster clicks_cluster ( user_id Int32, message String ) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', '{replica}') PRIMARY KEY (user_id) create table if not exists default.ck_flink_test on cluster clicks_cluster ( user_id Int32, message String ) engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', user_id); ``` 2. in flink sql client ``` CREATE TABLE ck_flink_test( user_id INTEGER, message String ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://xx:xx/default', 'table-name' = 'ck_flink_test', 'username’='xx', 'password’='xx' ); ``` 3. exec insert sql ``` INSERT INTO ck_flink_test (user_id, message) VALUES (101, 'Hello'); ``` 4. thd data can be viewed on the ck client ``` select * from ck_flink_test; ``` 5.the flink sql client ![image](https://github.com/apache/flink-connector-jdbc/assets/19184146/89eca20f-eabf-4090-9460-c46716653afc) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
afedulov commented on code in PR #22850: URL: https://github.com/apache/flink/pull/22850#discussion_r1259648022 ## flink-connectors/flink-connector-datagen-test/pom.xml: ## @@ -0,0 +1,89 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.18-SNAPSHOT + + + flink-connector-datagen-tests Review Comment: Perfect, thanks for you input. The "not great, not terrible (c)" option it is then, I also had a slight preference towards it. I'll add comments to the pom to make it clear why this module exists. ## flink-connectors/flink-connector-datagen-test/pom.xml: ## @@ -0,0 +1,89 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.18-SNAPSHOT + + + flink-connector-datagen-tests Review Comment: Perfect, thanks for your input. The "not great, not terrible (c)" option it is then, I also had a slight preference towards it. I'll add comments to the pom to make it clear why this module exists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] huwh closed pull request #22859: [FLINK-32385][runtime] Introduce SerializedShuffleDescriptorAndIndices to identify a group of ShuffleDescriptorAndIndex
huwh closed pull request #22859: [FLINK-32385][runtime] Introduce SerializedShuffleDescriptorAndIndices to identify a group of ShuffleDescriptorAndIndex URL: https://github.com/apache/flink/pull/22859 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] huwh closed pull request #22860: [FLINK-32386][runtime] Introduce ShuffleDescriptorsCache to cache ShuffleDescriptorAndIndex
huwh closed pull request #22860: [FLINK-32386][runtime] Introduce ShuffleDescriptorsCache to cache ShuffleDescriptorAndIndex URL: https://github.com/apache/flink/pull/22860 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzifu666 closed pull request #22021: [Hotfix] typo hotfix in TestingDispatcher
xuzifu666 closed pull request #22021: [Hotfix] typo hotfix in TestingDispatcher URL: https://github.com/apache/flink/pull/22021 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzifu666 commented on pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend
xuzifu666 commented on PR #22980: URL: https://github.com/apache/flink/pull/22980#issuecomment-1630694842 > Thanks for the improvement, could you squash all commits and rebase this to latest master branch. done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32577) Avoid memory fragmentation when running CI for flink-table-planner module
[ https://issues.apache.org/jira/browse/FLINK-32577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunhong Zheng updated FLINK-32577: -- Description: This issue is a sub-issue of FLINK-18356. (was: This issue is a sub-issue of FLINK-18356. When I run mvn verify for flink table-planner in azure CI and my own machine. I found that the heap memory and non-heap memory of JVM are stable and within the normal range. However, the total memory usage ({*}RES{*}) of the fork process is very high, as shown in the following figure(PID : 2958793 and 2958794): !image-2023-07-11-19-28-52-851.png|width=537,height=245! I try to delve deeper into the specific memory allocation of these two processes: {code:java} pmap -p 2958793 {code} I found that there are a lot of memory fragmentation here with a size close to *64MB* (>200 memory fragmentation): !image-2023-07-11-19-35-54-530.png|width=237,height=413! Based on past experience, this issue is likely to trigger the classic problem of the incorrect memory fragmentation manage by *glibc of JDK8.* So we downloaded *libjemalloc* and added the environment variable: {code:java} export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code} After that, the overall memory of the fork process has become stable and meets expectations (5GB): !image-2023-07-11-19-41-18-626.png|width=488,height=208! !image-2023-07-11-19-41-37-105.png|width=228,height=287! The solution to this problem requires modifying the CI execution Docker image [Docker image|[https://github.com/flink-ci/flink-ci-docker],] replacing *glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{{*}} {code:java} apt-get -y install libjemalloc-dev ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code} I have opened a new Jira (FLINK-32577) to track and fix this issue. cc [~mapohl] [~jark]. ) > Avoid memory fragmentation when running CI for flink-table-planner module > - > > Key: FLINK-32577 > URL: https://issues.apache.org/jira/browse/FLINK-32577 > Project: Flink > Issue Type: Improvement > Components: Build System / CI, Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Yunhong Zheng >Priority: Major > Fix For: 1.18.0 > > > This issue is a sub-issue of FLINK-18356. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong commented on a diff in pull request #22975: [FLINK-31643][network] Introduce the configurations for tiered storage
xintongsong commented on code in PR #22975: URL: https://github.com/apache/flink/pull/22975#discussion_r1259626675 ## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java: ## @@ -49,7 +55,9 @@ public class NettyShuffleMaster implements ShuffleMaster private final int networkBufferSize; -// TODO, WIP: create tiered internal shuffle master after enabling the tiered storage. +@Nullable private TieredInternalShuffleMaster tieredInternalShuffleMaster; + +@Nullable private TieredStorageConfiguration tieredStorageConfiguration; Review Comment: Should be final. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java: ## @@ -40,7 +41,9 @@ public class TieredInternalShuffleMaster { public TieredInternalShuffleMaster(Configuration conf) { TieredStorageConfiguration tieredStorageConfiguration = -TieredStorageConfiguration.fromConfiguration(conf); +TieredStorageConfiguration.builder( + conf.getString(NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH)) +.build(); Review Comment: This change seems belonging to the first commit. Otherwise, there will be a compiling error. ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); +@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) +@Experimental +@Internal +public static final ConfigOption NETWORK_HYBRID_SHUFFLE_ENABLE_TIERED_STORAGE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-tiered-storage") +.booleanType() +.defaultValue(false) +.withDescription( +"The option is used to enable tiered storage architecture for hybrid shuffle mode."); + +@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) +@Experimental +@Internal +public static final ConfigOption NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH = +key("taskmanager.network.hybrid-shuffle.remote.path") +.stringType() +.noDefaultValue() +.withDescription( +"The base home path of remote storage for remote tier. If the option is configured, " ++ "Hybrid Shuffle will use the remote storage path as a supplement to the" ++ "local disks. If not configured, the remote storage will not be used."); Review Comment: 1. This is not internal 2. JavaDoc is missing 3. Shall we expose the concept `remote tier` to users at all? What are the minimum necessary things that user needs to understand? ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); +@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) +@Experimental +@Internal +public static final ConfigOption NETWORK_HYBRID_SHUFFLE_ENABLE_TIERED_STORAGE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-tiered-storage") +.booleanType() +.defaultValue(false) +.withDescription( +"The option is used to enable tiered storage architecture for hybrid shuffle mode."); Review Comment: 1. This is not internal. 2. JavaDoc is missing. 3. What is "tiered storage architecture" from the user's perspective? How is it different from the original architecture? What does the user needs to know about this? 4. Why is the default value `false`? How can a user decide which mode to use? Would there be anyone enable it at all? ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the
[jira] [Updated] (FLINK-32577) Avoid memory fragmentation when running CI for flink-table-planner module
[ https://issues.apache.org/jira/browse/FLINK-32577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunhong Zheng updated FLINK-32577: -- Description: This issue is a sub-issue of FLINK-18356. When I run mvn verify for flink table-planner in azure CI and my own machine. I found that the heap memory and non-heap memory of JVM are stable and within the normal range. However, the total memory usage ({*}RES{*}) of the fork process is very high, as shown in the following figure(PID : 2958793 and 2958794): !image-2023-07-11-19-28-52-851.png|width=537,height=245! I try to delve deeper into the specific memory allocation of these two processes: {code:java} pmap -p 2958793 {code} I found that there are a lot of memory fragmentation here with a size close to *64MB* (>200 memory fragmentation): !image-2023-07-11-19-35-54-530.png|width=237,height=413! Based on past experience, this issue is likely to trigger the classic problem of the incorrect memory fragmentation manage by *glibc of JDK8.* So we downloaded *libjemalloc* and added the environment variable: {code:java} export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code} After that, the overall memory of the fork process has become stable and meets expectations (5GB): !image-2023-07-11-19-41-18-626.png|width=488,height=208! !image-2023-07-11-19-41-37-105.png|width=228,height=287! The solution to this problem requires modifying the CI execution Docker image [Docker image|[https://github.com/flink-ci/flink-ci-docker],] replacing *glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{{*}} {code:java} apt-get -y install libjemalloc-dev ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code} I have opened a new Jira (FLINK-32577) to track and fix this issue. cc [~mapohl] [~jark]. > Avoid memory fragmentation when running CI for flink-table-planner module > - > > Key: FLINK-32577 > URL: https://issues.apache.org/jira/browse/FLINK-32577 > Project: Flink > Issue Type: Improvement > Components: Build System / CI, Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Yunhong Zheng >Priority: Major > Fix For: 1.18.0 > > > This issue is a sub-issue of FLINK-18356. > When I run mvn verify for flink table-planner in azure CI and my own machine. > I found that the heap memory and non-heap memory of JVM are stable and > within the normal range. However, the total memory usage ({*}RES{*}) of the > fork process is very high, as shown in the following figure(PID : 2958793 and > 2958794): > !image-2023-07-11-19-28-52-851.png|width=537,height=245! > I try to delve deeper into the specific memory allocation of these two > processes: > > {code:java} > pmap -p 2958793 {code} > I found that there are a lot of memory fragmentation here with a size close > to *64MB* (>200 memory fragmentation): > > !image-2023-07-11-19-35-54-530.png|width=237,height=413! > Based on past experience, this issue is likely to trigger the classic problem > of the incorrect memory fragmentation manage by *glibc of JDK8.* So we > downloaded *libjemalloc* and added the environment variable: > > {code:java} > export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code} > After that, the overall memory of the fork process has become stable and > meets expectations (5GB): > > !image-2023-07-11-19-41-18-626.png|width=488,height=208! > !image-2023-07-11-19-41-37-105.png|width=228,height=287! > The solution to this problem requires modifying the CI execution Docker image > [Docker image|[https://github.com/flink-ci/flink-ci-docker],] replacing > *glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{{*}} > {code:java} > apt-get -y install libjemalloc-dev > ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code} > I have opened a new Jira (FLINK-32577) to track and fix this issue. cc > [~mapohl] [~jark]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-18356) flink-table-planner Exit code 137 returned from process
[ https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741961#comment-17741961 ] Yunhong Zheng edited comment on FLINK-18356 at 7/11/23 11:55 AM: - Hi, all. I think I found the root cause of table-planner exit 137 error under the guidance of [~lincoln.86xy] . This error is similar to issue FLINK-19125, both are caused by the incorrect memory fragmentation manage by {*}glibc{*}, which will not return memory to kernel gracefully. (refer to [glibc bugzilla|https://sourceware.org/bugzilla/show_bug.cgi?id=15321] and [glibc manual|https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc]). When I run mvn verify for flink table-planner in azure CI and my own machine. I found that the heap memory and non-heap memory of JVM are stable and within the normal range. However, the total memory usage ({*}RES{*}) of the fork process is very high, as shown in the following figure(PID : 2958793 and 2958794): !image-2023-07-11-19-28-52-851.png|width=537,height=245! I try to delve deeper into the specific memory allocation of these two processes: {code:java} pmap -p 2958793 {code} I found that there are a lot of memory fragmentation here with a size close to *64MB* (>200 memory fragmentation): !image-2023-07-11-19-35-54-530.png|width=237,height=413! Based on past experience, this issue is likely to trigger the classic problem of the incorrect memory fragmentation manage by *glibc of JDK8.* So we downloaded *libjemalloc* and added the environment variable: {code:java} export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code} After that, the overall memory of the fork process has become stable and meets expectations (5GB): !image-2023-07-11-19-41-18-626.png|width=488,height=208! !image-2023-07-11-19-41-37-105.png|width=228,height=287! The solution to this problem requires modifying the CI execution Docker image [Docker image|[https://github.com/flink-ci/flink-ci-docker],] replacing *glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] . {code:java} apt-get -y install libjemalloc-dev ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code} I have opened a new Jira (FLINK-32577) to track and fix this issue. cc [~mapohl] [~jark]. was (Author: JIRAUSER287975): Hi, all. I think I found the root cause of table-planner exit 137 error under the guidance of [~lincoln.86xy] . This error is similar to issue [FLINK-19125|https://issues.apache.org/jira/browse/FLINK-19125], both are caused by the incorrect memory fragmentation manage by {*}glibc{*}, which will not return memory to kernel gracefully. (refer to [glibc bugzilla|https://sourceware.org/bugzilla/show_bug.cgi?id=15321] and [glibc manual|https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc]). When I run mvn verify for flink table-planner in azure CI and my own machine. I found that the heap memory and non-heap memory of JVM are stable and within the normal range. However, the total memory usage ({*}RES{*}) of the fork process is very high, as shown in the following figure(PID : 2958793 and 2958794): !image-2023-07-11-19-28-52-851.png|width=537,height=245! I try to delve deeper into the specific memory allocation of these two processes: {code:java} pmap -p 2958793 {code} I found that there are a lot of memory fragmentation here with a size close to *64MB* (>200 memory fragmentation): !image-2023-07-11-19-35-54-530.png|width=237,height=413! Based on past experience, this issue is likely to trigger the classic problem of the incorrect memory fragmentation manage by *glibc of JDK8.* So we downloaded *libjemalloc* and added the environment variable: {code:java} export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code} After that, the overall memory of the fork process has become stable and meets expectations (5GB): !image-2023-07-11-19-41-18-626.png|width=488,height=208! !image-2023-07-11-19-41-37-105.png|width=228,height=287! The solution to this problem requires modifying the CI execution Docker image [Docker image|[https://github.com/flink-ci/flink-ci-docker],] replacing *glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{*} {code:java} apt-get -y install libjemalloc-dev ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code} I have opened a new Jira (FLINK-32577) to track and fix this issue. cc [~mapohl] [~jark]. > flink-table-planner Exit code 137 returned from process > --- > > Key: FLINK-18356 > URL: https://issues.apache.org/jira/browse/FLINK-18356 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Tests >Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0 >Reporter: