[GitHub] [flink] huwh commented on pull request #22861: [FLINK-32387][runtime] InputGateDeploymentDescriptor uses cache to avoid deserializing shuffle descriptors multiple times

2023-07-11 Thread via GitHub


huwh commented on PR #22861:
URL: https://github.com/apache/flink/pull/22861#issuecomment-1631882530

   @wanglijie95, Thanks for the review. I made the changes as you suggested. 
PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on a diff in pull request #22744: [FLINK-29802][state] Changelog supports native savepoint

2023-07-11 Thread via GitHub


masteryhx commented on code in PR #22744:
URL: https://github.com/apache/flink/pull/22744#discussion_r1260611912


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -375,6 +378,49 @@ public RunnableFuture> 
snapshot(
 @Nonnull CheckpointStreamFactory streamFactory,
 @Nonnull CheckpointOptions checkpointOptions)
 throws Exception {
+
+if (checkpointOptions.getCheckpointType().isSavepoint()) {
+SnapshotType.SharingFilesStrategy sharingFilesStrategy =
+
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+if (sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING) {
+// For NO_SHARING native savepoint, trigger delegated one
+RunnableFuture> 
delegatedSnapshotResult =
+keyedStateBackend.snapshot(
+checkpointId, timestamp, streamFactory, 
checkpointOptions);

Review Comment:
   Thanks for the reply.
   Maybe I missed something.
   Currently, `materializationId` is a dummy one while triggering native 
savepoint.
   Do you mean that we should use current real `materializationId` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on a diff in pull request #22801: [FLINK-23484][benchmark] Supports base benchmark for ChangelogStateBackend

2023-07-11 Thread via GitHub


masteryhx commented on code in PR #22801:
URL: https://github.com/apache/flink/pull/22801#discussion_r1260563718


##
flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/RescalingBenchmark.java:
##
@@ -16,7 +16,7 @@
  * limitations under the License
  */
 
-package org.apache.flink.contrib.streaming.state.benchmark;
+package org.apache.flink.state.benchmark;

Review Comment:
   Sure. PR Link: https://github.com/apache/flink-benchmarks/pull/75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-benchmarks] masteryhx commented on a diff in pull request #75: [FLINK-23484] Supports to test ChangelogStateBackend

2023-07-11 Thread via GitHub


masteryhx commented on code in PR #75:
URL: https://github.com/apache/flink-benchmarks/pull/75#discussion_r1260562329


##
src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java:
##
@@ -57,7 +56,7 @@ public class StateBenchmarkBase extends BenchmarkBase {
 static AtomicInteger keyIndex;
 final ThreadLocalRandom random = ThreadLocalRandom.current();
 
-@Param({"HEAP", "ROCKSDB"})
+@Param({"HEAP", "ROCKSDB", "HEAP_CHANGELOG", "ROCKSDB_CHANGELOG"})

Review Comment:
   Thanks for the advice.
   I aggree that we should focus on testing the production env.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore

2023-07-11 Thread via GitHub


FangYongs commented on code in PR #22937:
URL: https://github.com/apache/flink/pull/22937#discussion_r1260545206


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogStoreFactory.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Map;
+
+/**
+ * A factory to create configured catalog store instances based on 
string-based properties. See also
+ * {@link Factory} for more information.
+ *
+ * This factory is specifically designed for the Flink SQL gateway 
scenario, where different
+ * catalog stores need to be created for different sessions.
+ *
+ * If the CatalogStore is implemented using JDBC, this factory can be used 
to create a JDBC
+ * connection pool in the open method. This connection pool can then be reused 
for subsequent
+ * catalog store creations.
+ *
+ * The following examples implementation of CatalogStoreFactory using jdbc.
+ *
+ * {@code
+ * public class JdbcCatalogStore implements CatalogStore {
+ *
+ * private JdbcConnectionPool jdbcConnectionPool;
+ * public JdbcCatalogStore(JdbcConnectionPool jdbcConnectionPool) {
+ * this.jdbcConnectionPool = jdbcConnectionPool;
+ * }
+ * ...
+ * }
+ *
+ * public class JdbcCatalogStoreFactory implements CatalogStoreFactory {
+ *
+ * private JdbcConnectionPool jdbcConnectionPool;
+ *
+ * @Override
+ * public CatalogStore createCatalogStore(Context context) {
+ * return new JdbcCatalogStore(jdbcConnectionPool);
+ * }
+ *
+ * @Override
+ * public void open(Context context) throws CatalogException {
+ * // initialize the thread pool using options from context
+ * jdbcConnectionPool = initializeJdbcConnectionPool(context);
+ * }
+ * ...

Review Comment:
   Add `close` for JdbcCatalogStoreFactory? I think the `close` is very special 
for jdbc catalog store factory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore

2023-07-11 Thread via GitHub


FangYongs commented on code in PR #22937:
URL: https://github.com/apache/flink/pull/22937#discussion_r1260543524


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/CatalogStoreFactoryTest.java:
##
@@ -0,0 +1,9 @@
+package org.apache.flink.table.factories;
+
+import org.junit.jupiter.api.Test;
+

Review Comment:
   Add license and doc



##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogStoreFactory.java:
##
@@ -0,0 +1,87 @@
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+

Review Comment:
   Add license and doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32549) Tiered storage memory manager supports ownership transfer for buffers

2023-07-11 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan closed FLINK-32549.
-
Fix Version/s: 1.18.0
   Resolution: Fixed

> Tiered storage memory manager supports ownership transfer for buffers
> -
>
> Key: FLINK-32549
> URL: https://issues.apache.org/jira/browse/FLINK-32549
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently, the accumulator is responsible for requesting all buffers, leading 
> to an inaccurate number of requested buffers for each tier. 
> To address this issue, buffer ownership must be transferred from the 
> accumulator to the tiers when writing them, which will enable the memory 
> manager to maintain a correct number of requested buffers for different 
> owners.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs commented on a diff in pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore

2023-07-11 Thread via GitHub


FangYongs commented on code in PR #22937:
URL: https://github.com/apache/flink/pull/22937#discussion_r1260537362


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreTest.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link GenericInMemoryCatalogStore}. */
+public class GenericInMemoryCatalogStoreTest {
+
+@Test
+void testStoreAndGet() {
+CatalogStore catalogStore = new GenericInMemoryCatalogStore();
+catalogStore.open();
+
+catalogStore.storeCatalog(
+"catalog1", CatalogDescriptor.of("catalog1", new 
Configuration()));
+assertThat(catalogStore.getCatalog("catalog1").isPresent()).isTrue();
+assertThat(catalogStore.contains("catalog1")).isTrue();
+
+catalogStore.removeCatalog("catalog1", true);
+assertThat(catalogStore.contains("catalog1")).isFalse();
+
+catalogStore.close();

Review Comment:
   It's a little strange that `CatalogStore` can still be used after it is 
closed. Can we restrict this behavior? For example, `CatalogManager` should 
throw an exception when it access a closed `CatalogStore`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on PR #22966:
URL: https://github.com/apache/flink/pull/22966#issuecomment-1631802657

   Thanks for review @lsyldliu @swuferhong . All comments have been addressed 
or replied ,PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260529924


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java:
##
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Planner program that tries to inject runtime filter for suitable join to 
improve join
+ * performance.
+ *
+ * We build the runtime filter in a two-phase manner: First, each subtask 
on the build side
+ * builds a local filter based on its local data, and sends the built filter 
to a global aggregation
+ * node. Then the global aggregation node aggregates the received filters into 
a global filter, and
+ * sends the global filter to all probe side subtasks. Therefore, we will add 
{@link
+ * BatchPhysicalLocalRuntimeFilterBuilder}, {@link 
BatchPhysicalGlobalRuntimeFilterBuilder} and
+ * {@link BatchPhysicalRuntimeFilter} into the physical plan.
+ *
+ * For example, for the following query:
+ *
+ * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2}
+ *
+ * The original physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- TableSourceScan(table=[[fact]], fields=[a, b, c])
+ *+- Exchange(distribution=[hash[x]])
+ *   +- Calc(select=[x, y], where=[=(z, 2)])
+ *  +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z])
+ * }
+ *
+ * This optimized physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- RuntimeFilter(select=[a])
+ *: :- Exchange(distribution=[broadcast])
+ *: 

[GitHub] [flink] xuzifu666 commented on a diff in pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend

2023-07-11 Thread via GitHub


xuzifu666 commented on code in PR #22980:
URL: https://github.com/apache/flink/pull/22980#discussion_r1260521926


##
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java:
##
@@ -364,6 +364,7 @@ public Collection getAllHandles() throws Exception {
 return client.getChildren().forPath(path);
 } catch (KeeperException.NoNodeException ignored) {
 // Concurrent deletion, retry
+LOG.debug("Unable to get all handles, retrying (ZNode was 
likely deleted concurrently: {})", ignored.getMessage());

Review Comment:
   done,format had changed @rkhachatryan 



##
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java:
##
@@ -364,6 +364,7 @@ public Collection getAllHandles() throws Exception {
 return client.getChildren().forPath(path);
 } catch (KeeperException.NoNodeException ignored) {
 // Concurrent deletion, retry
+LOG.debug("Unable to get all handles, retrying (ZNode was 
likely deleted concurrently: {})", ignored.getMessage());

Review Comment:
   done,format had changed @rkhachatryan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22982: [FLINK-32576][network] ProducerMergedPartitionFileIndex supports caching regions and spilling regions to file when the cache is too large

2023-07-11 Thread via GitHub


flinkbot commented on PR #22982:
URL: https://github.com/apache/flink/pull/22982#issuecomment-1631791507

   
   ## CI report:
   
   * 96c12bd159b3cfe3f9785557badba75928e79803 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260518408


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##
@@ -165,6 +167,51 @@ public class OptimizerConfigOptions {
 "When it is true, the optimizer will try to push 
dynamic filtering into scan table source,"
 + " the irrelevant partitions or input 
data will be filtered to reduce scan I/O in runtime.");
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption 
TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED =
+key("table.optimizer.runtime-filter.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable the runtime filter. "
++ "When it is true, the optimizer will try 
to inject a runtime filter for eligible join.");
+
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption
+TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE =
+key("table.optimizer.runtime-filter.max-build-data-size")
+.memoryType()
+.defaultValue(MemorySize.parse("10m"))

Review Comment:
   The best vaule of default are still in testing. I will try to find a best 
default value based on the performance result of TPCDS-10T.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260517558


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java:
##
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Planner program that tries to inject runtime filter for suitable join to 
improve join
+ * performance.
+ *
+ * We build the runtime filter in a two-phase manner: First, each subtask 
on the build side
+ * builds a local filter based on its local data, and sends the built filter 
to a global aggregation
+ * node. Then the global aggregation node aggregates the received filters into 
a global filter, and
+ * sends the global filter to all probe side subtasks. Therefore, we will add 
{@link
+ * BatchPhysicalLocalRuntimeFilterBuilder}, {@link 
BatchPhysicalGlobalRuntimeFilterBuilder} and
+ * {@link BatchPhysicalRuntimeFilter} into the physical plan.
+ *
+ * For example, for the following query:
+ *
+ * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2}
+ *
+ * The original physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- TableSourceScan(table=[[fact]], fields=[a, b, c])
+ *+- Exchange(distribution=[hash[x]])
+ *   +- Calc(select=[x, y], where=[=(z, 2)])
+ *  +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z])
+ * }
+ *
+ * This optimized physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- RuntimeFilter(select=[a])
+ *: :- Exchange(distribution=[broadcast])
+ *: 

[jira] [Updated] (FLINK-32576) ProducerMergedPartitionFileIndex supports spilling to file

2023-07-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32576:
---
Labels: pull-request-available  (was: )

> ProducerMergedPartitionFileIndex supports spilling to file
> --
>
> Key: FLINK-32576
> URL: https://issues.apache.org/jira/browse/FLINK-32576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> When running a very large-scale job, ProducerMergedPartitionFileIndex may 
> occupy too much heap memory and cause OOM.
> To resolve the issue, ProducerMergedPartitionFileIndex should support 
> spilling to file to release the occupied memory when necessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22982: [FLINK-32576][network] ProducerMergedPartitionFileIndex supports caching regions and spilling regions to file when the cache is too lar

2023-07-11 Thread via GitHub


TanYuxin-tyx opened a new pull request, #22982:
URL: https://github.com/apache/flink/pull/22982

   
   
   ## What is the purpose of the change
   
   *ProducerMergedPartitionFileIndex supports caching regions and spilling 
regions to file when the cache is too large*
   
   
   ## Brief change log
   Use the `HsFileDataIndexSpilledRegionManager` and  `HsFileDataIndexCache` to 
cache or spill the `ProducerMergedPartitionFileIndex`.
 - *[**commit 1**] Introduce a new parent region class HsBaseRegion*
 - *[**commit 2**] Use the HsBaseRegion to replace InternalRegion in 
HsFileDataIndexSpilledRegionManager*
 - *[**commit 3**] Rename the Segment in 
HsFileDataIndexSpilledRegionManagerImpl to RegionGroup*
 - *[**commit 4**] Introduce a new API HsRegionFileOperation. And 
ProducerMergedPartitionFileIndex supports caching regions and spilling regions 
to file when the cache is too large* 
   
   
   ## Verifying this change
   
   This change will add unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260516947


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Test for {@link FlinkRuntimeFilterProgram}. */
+public class FlinkRuntimeFilterProgramTest extends TableTestBase {
+// 128L * 1024L * 48 = 6MB
+private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L;
+// 1024L * 1024L * 1024L * 60 = 60GB
+private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L;
+
+private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+private final TestValuesCatalog catalog =
+new TestValuesCatalog("testCatalog", "test_database", true);
+
+@Before

Review Comment:
   We have to use junit4 here, because the base class `TableTestBase` is based 
on junit4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260516252


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Test for {@link FlinkRuntimeFilterProgram}. */
+public class FlinkRuntimeFilterProgramTest extends TableTestBase {
+// 128L * 1024L * 48 = 6MB
+private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L;
+// 1024L * 1024L * 1024L * 60 = 60GB
+private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L;
+
+private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+private final TestValuesCatalog catalog =
+new TestValuesCatalog("testCatalog", "test_database", true);
+
+@Before
+public void setup() {
+catalog.open();
+util.tableEnv().registerCatalog("testCatalog", catalog);
+util.tableEnv().useCatalog("testCatalog");
+TableConfig tableConfig = util.tableEnv().getConfig();
+
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, 
true);
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE,
+MemorySize.parse("10m"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE,
+MemorySize.parse("10g"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5);

Review Comment:
   I explicitly set the runtime-filter related config options here, due to the 
default value may be changed later(The best default values are still in 
testing). We explicitly set them here, even if the default values are changed 
later, current test will not be affected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32549) Tiered storage memory manager supports ownership transfer for buffers

2023-07-11 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742244#comment-17742244
 ] 

Weijie Guo commented on FLINK-32549:


master(1.18) via 2dfff436c09821fb658bf8d289206b9ef85bb25b.

> Tiered storage memory manager supports ownership transfer for buffers
> -
>
> Key: FLINK-32549
> URL: https://issues.apache.org/jira/browse/FLINK-32549
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the accumulator is responsible for requesting all buffers, leading 
> to an inaccurate number of requested buffers for each tier. 
> To address this issue, buffer ownership must be transferred from the 
> accumulator to the tiers when writing them, which will enable the memory 
> manager to maintain a correct number of requested buffers for different 
> owners.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa merged pull request #22960: [FLINK-32549][network] Tiered storage memory manager supports ownership transfer for buffers

2023-07-11 Thread via GitHub


reswqa merged PR #22960:
URL: https://github.com/apache/flink/pull/22960


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260510568


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Test for {@link FlinkRuntimeFilterProgram}. */
+public class FlinkRuntimeFilterProgramTest extends TableTestBase {
+// 128L * 1024L * 48 = 6MB
+private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L;
+// 1024L * 1024L * 1024L * 60 = 60GB
+private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L;
+
+private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+private final TestValuesCatalog catalog =
+new TestValuesCatalog("testCatalog", "test_database", true);
+
+@Before
+public void setup() {
+catalog.open();
+util.tableEnv().registerCatalog("testCatalog", catalog);
+util.tableEnv().useCatalog("testCatalog");
+TableConfig tableConfig = util.tableEnv().getConfig();
+
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, 
true);
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE,
+MemorySize.parse("10m"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE,
+MemorySize.parse("10g"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5);
+util.getTableEnv()
+.getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);

Review Comment:
   Yes, you are right. The `TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD` and 
`TABLE_OPTIMIZER_AGG_PHASE_STRATEGY` are used to let the build side is a direct 
Agg or Calc (without Exchange). I move these 2 options to the test 
`testBuildSideIsAggWithoutExchange` and `testBuildSideIsCalcWithoutExchange`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260509411


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Test for {@link FlinkRuntimeFilterProgram}. */
+public class FlinkRuntimeFilterProgramTest extends TableTestBase {
+// 128L * 1024L * 48 = 6MB
+private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L;
+// 1024L * 1024L * 1024L * 60 = 60GB
+private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L;
+
+private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+private final TestValuesCatalog catalog =
+new TestValuesCatalog("testCatalog", "test_database", true);
+
+@Before
+public void setup() {
+catalog.open();
+util.tableEnv().registerCatalog("testCatalog", catalog);
+util.tableEnv().useCatalog("testCatalog");
+TableConfig tableConfig = util.tableEnv().getConfig();
+
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, 
true);
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE,
+MemorySize.parse("10m"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE,
+MemorySize.parse("10g"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5);
+util.getTableEnv()
+.getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);
+util.getTableEnv()
+.getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+
+// row avg size is 48
+String dimDdl =
+"create table dim (\n"
++ "  id BIGINT,\n"
++ "  male BOOLEAN,\n"
++ "  amount BIGINT,\n"
++ "  price BIGINT,\n"
++ "  dim_date_sk BIGINT\n"
++ ")  with (\n"
++ " 'connector' = 'values',\n"
++ " 'runtime-source' = 'NewSource',\n"
++ " 'bounded' = 'true'\n"
++ ")";
+util.tableEnv().executeSql(dimDdl);
+
+// row avg size is 60
+String factDdl =
+"create table fact (\n"
++ "  id BIGINT,\n"
++ "  name STRING,\n"
++ "  amount BIGINT,\n"
++ "  price BIGINT,\n"
++ "  fact_date_sk BIGINT\n"
++ ") with (\n"
++ "  'connector' = 'values',\n"
++ "  'runtime-source' = 'NewSource',\n"
++ "  'bounded' = 'true'\n"
++ ")";
+util.tableEnv().executeSql(factDdl);
+}
+
+@Test
+public void testSimpleInnerJoin() throws Exception {
+// runtime filter will succeed
+setupSuitableTableStatistics();
+String query = "select * 

[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260508642


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Test for {@link FlinkRuntimeFilterProgram}. */
+public class FlinkRuntimeFilterProgramTest extends TableTestBase {
+// 128L * 1024L * 48 = 6MB
+private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L;
+// 1024L * 1024L * 1024L * 60 = 60GB
+private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L;
+
+private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+private final TestValuesCatalog catalog =
+new TestValuesCatalog("testCatalog", "test_database", true);
+
+@Before
+public void setup() {
+catalog.open();
+util.tableEnv().registerCatalog("testCatalog", catalog);
+util.tableEnv().useCatalog("testCatalog");
+TableConfig tableConfig = util.tableEnv().getConfig();
+
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, 
true);
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE,
+MemorySize.parse("10m"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE,
+MemorySize.parse("10g"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5);
+util.getTableEnv()
+.getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);
+util.getTableEnv()
+.getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+
+// row avg size is 48
+String dimDdl =
+"create table dim (\n"
++ "  id BIGINT,\n"
++ "  male BOOLEAN,\n"
++ "  amount BIGINT,\n"
++ "  price BIGINT,\n"
++ "  dim_date_sk BIGINT\n"
++ ")  with (\n"
++ " 'connector' = 'values',\n"
++ " 'runtime-source' = 'NewSource',\n"
++ " 'bounded' = 'true'\n"
++ ")";
+util.tableEnv().executeSql(dimDdl);
+
+// row avg size is 60
+String factDdl =
+"create table fact (\n"
++ "  id BIGINT,\n"
++ "  name STRING,\n"
++ "  amount BIGINT,\n"
++ "  price BIGINT,\n"
++ "  fact_date_sk BIGINT\n"
++ ") with (\n"
++ "  'connector' = 'values',\n"
++ "  'runtime-source' = 'NewSource',\n"
++ "  'bounded' = 'true'\n"
++ ")";
+util.tableEnv().executeSql(factDdl);
+}
+
+@Test
+public void testSimpleInnerJoin() throws Exception {
+// runtime filter will succeed
+setupSuitableTableStatistics();
+String query = "select * 

[jira] [Created] (FLINK-32581) Add document for atomic CTAS

2023-07-11 Thread tartarus (Jira)
tartarus created FLINK-32581:


 Summary: Add document for atomic CTAS
 Key: FLINK-32581
 URL: https://issues.apache.org/jira/browse/FLINK-32581
 Project: Flink
  Issue Type: Sub-task
Reporter: tartarus
 Fix For: 1.18.0


add docs for atomic CTAS



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260504822


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * withOUT WARRANTIES OR ConDITIonS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Test for {@link FlinkRuntimeFilterProgram}. */
+public class FlinkRuntimeFilterProgramTest extends TableTestBase {
+// 128L * 1024L * 48 = 6MB
+private static final long SUITABLE_DIM_ROW_COUNT = 128L * 1024L;
+// 1024L * 1024L * 1024L * 60 = 60GB
+private static final long SUITABLE_FACT_ROW_COUNT = 1024L * 1024L * 1024L;
+
+private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+private final TestValuesCatalog catalog =
+new TestValuesCatalog("testCatalog", "test_database", true);
+
+@Before
+public void setup() {
+catalog.open();
+util.tableEnv().registerCatalog("testCatalog", catalog);
+util.tableEnv().useCatalog("testCatalog");
+TableConfig tableConfig = util.tableEnv().getConfig();
+
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, 
true);
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE,
+MemorySize.parse("10m"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE,
+MemorySize.parse("10g"));
+tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_FILTER_RATIO, 0.5);
+util.getTableEnv()
+.getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, -1L);
+util.getTableEnv()
+.getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+
+// row avg size is 48
+String dimDdl =
+"create table dim (\n"
++ "  id BIGINT,\n"
++ "  male BOOLEAN,\n"
++ "  amount BIGINT,\n"
++ "  price BIGINT,\n"
++ "  dim_date_sk BIGINT\n"
++ ")  with (\n"
++ " 'connector' = 'values',\n"
++ " 'runtime-source' = 'NewSource',\n"
++ " 'bounded' = 'true'\n"
++ ")";
+util.tableEnv().executeSql(dimDdl);
+
+// row avg size is 60
+String factDdl =
+"create table fact (\n"
++ "  id BIGINT,\n"
++ "  name STRING,\n"
++ "  amount BIGINT,\n"
++ "  price BIGINT,\n"
++ "  fact_date_sk BIGINT\n"
++ ") with (\n"
++ "  'connector' = 'values',\n"
++ "  'runtime-source' = 'NewSource',\n"
++ "  'bounded' = 'true'\n"
++ ")";
+util.tableEnv().executeSql(factDdl);
+}
+
+@Test
+public void testSimpleInnerJoin() throws Exception {
+// runtime filter will succeed
+setupSuitableTableStatistics();
+String query = "select * 

[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260503747


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java:
##
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Planner program that tries to inject runtime filter for suitable join to 
improve join
+ * performance.
+ *
+ * We build the runtime filter in a two-phase manner: First, each subtask 
on the build side
+ * builds a local filter based on its local data, and sends the built filter 
to a global aggregation
+ * node. Then the global aggregation node aggregates the received filters into 
a global filter, and
+ * sends the global filter to all probe side subtasks. Therefore, we will add 
{@link
+ * BatchPhysicalLocalRuntimeFilterBuilder}, {@link 
BatchPhysicalGlobalRuntimeFilterBuilder} and
+ * {@link BatchPhysicalRuntimeFilter} into the physical plan.
+ *
+ * For example, for the following query:
+ *
+ * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2}
+ *
+ * The original physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- TableSourceScan(table=[[fact]], fields=[a, b, c])
+ *+- Exchange(distribution=[hash[x]])
+ *   +- Calc(select=[x, y], where=[=(z, 2)])
+ *  +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z])
+ * }
+ *
+ * This optimized physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- RuntimeFilter(select=[a])
+ *: :- Exchange(distribution=[broadcast])
+ *: 

[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260503527


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java:
##
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Planner program that tries to inject runtime filter for suitable join to 
improve join
+ * performance.
+ *
+ * We build the runtime filter in a two-phase manner: First, each subtask 
on the build side
+ * builds a local filter based on its local data, and sends the built filter 
to a global aggregation
+ * node. Then the global aggregation node aggregates the received filters into 
a global filter, and
+ * sends the global filter to all probe side subtasks. Therefore, we will add 
{@link
+ * BatchPhysicalLocalRuntimeFilterBuilder}, {@link 
BatchPhysicalGlobalRuntimeFilterBuilder} and
+ * {@link BatchPhysicalRuntimeFilter} into the physical plan.
+ *
+ * For example, for the following query:
+ *
+ * {@code SELECT * FROM fact, dim WHERE x = a AND z = 2}
+ *
+ * The original physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- TableSourceScan(table=[[fact]], fields=[a, b, c])
+ *+- Exchange(distribution=[hash[x]])
+ *   +- Calc(select=[x, y], where=[=(z, 2)])
+ *  +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z])
+ * }
+ *
+ * This optimized physical plan:
+ *
+ * {@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *:- Exchange(distribution=[hash[a]])
+ *:  +- RuntimeFilter(select=[a])
+ *: :- Exchange(distribution=[broadcast])
+ *: 

[jira] [Updated] (FLINK-32349) Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-07-11 Thread tartarus (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tartarus updated FLINK-32349:
-
Parent: FLINK-32580
Issue Type: Sub-task  (was: New Feature)

> Support atomic for CREATE TABLE AS SELECT(CTAS) statement
> -
>
> Key: FLINK-32349
> URL: https://issues.apache.org/jira/browse/FLINK-32349
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> For detailed information, see FLIP-305
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260501061


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##
@@ -165,6 +167,51 @@ public class OptimizerConfigOptions {
 "When it is true, the optimizer will try to push 
dynamic filtering into scan table source,"
 + " the irrelevant partitions or input 
data will be filtered to reduce scan I/O in runtime.");
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption 
TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED =
+key("table.optimizer.runtime-filter.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable the runtime filter. "
++ "When it is true, the optimizer will try 
to inject a runtime filter for eligible join.");
+
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption
+TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE =
+key("table.optimizer.runtime-filter.max-build-data-size")
+.memoryType()
+.defaultValue(MemorySize.parse("10m"))
+.withDescription(
+"Data volume threshold of the runtime 
filter build side. "
++ "Estimated data volume needs to 
be under this value to try to inject runtime filter.");
+
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption
+TABLE_OPTIMIZER_RUNTIME_FILTER_MIN_PROBE_DATA_SIZE =
+key("table.optimizer.runtime-filter.min-probe-data-size")
+.memoryType()
+.defaultValue(MemorySize.parse("10g"))
+.withDescription(
+Description.builder()
+.text(
+"Data volume threshold of 
the runtime filter probe side. "

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260500938


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##
@@ -165,6 +167,51 @@ public class OptimizerConfigOptions {
 "When it is true, the optimizer will try to push 
dynamic filtering into scan table source,"
 + " the irrelevant partitions or input 
data will be filtered to reduce scan I/O in runtime.");
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption 
TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED =
+key("table.optimizer.runtime-filter.enabled")
+.booleanType()
+.defaultValue(false)

Review Comment:
   We hope to enable this feature by default in future releases. But in 1.18, 
it will be disabled by default.



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##
@@ -165,6 +167,51 @@ public class OptimizerConfigOptions {
 "When it is true, the optimizer will try to push 
dynamic filtering into scan table source,"
 + " the irrelevant partitions or input 
data will be filtered to reduce scan I/O in runtime.");
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption 
TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED =
+key("table.optimizer.runtime-filter.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable the runtime filter. "
++ "When it is true, the optimizer will try 
to inject a runtime filter for eligible join.");
+
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption
+TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE =
+key("table.optimizer.runtime-filter.max-build-data-size")
+.memoryType()
+.defaultValue(MemorySize.parse("10m"))
+.withDescription(
+"Data volume threshold of the runtime 
filter build side. "

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32580) FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS)

2023-07-11 Thread tartarus (Jira)
tartarus created FLINK-32580:


 Summary: FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS)
 Key: FLINK-32580
 URL: https://issues.apache.org/jira/browse/FLINK-32580
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: tartarus
 Fix For: 1.18.0


FLIP-305 Support atomic for CREATE TABLE AS SELECT(CTAS) statement



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260499124


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##
@@ -165,6 +167,51 @@ public class OptimizerConfigOptions {
 "When it is true, the optimizer will try to push 
dynamic filtering into scan table source,"
 + " the irrelevant partitions or input 
data will be filtered to reduce scan I/O in runtime.");
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption 
TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED =
+key("table.optimizer.runtime-filter.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable the runtime filter. "
++ "When it is true, the optimizer will try 
to inject a runtime filter for eligible join.");
+
+@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+public static final ConfigOption
+TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE =
+key("table.optimizer.runtime-filter.max-build-data-size")
+.memoryType()
+.defaultValue(MemorySize.parse("10m"))

Review Comment:
   I 've added comments for the 3 related config options, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32157) Replaces LeaderConnectionInfo with LeaderInformation

2023-07-11 Thread Jiadong Lu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742235#comment-17742235
 ] 

Jiadong Lu commented on FLINK-32157:


hi [~mapohl] , i would like to address this issue, would you mind assigning it 
to me ?

> Replaces LeaderConnectionInfo with LeaderInformation
> 
>
> Key: FLINK-32157
> URL: https://issues.apache.org/jira/browse/FLINK-32157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: starter
>
> {{LeaderConnectionInfo}} and {{LeaderInformation}} have the same purpose. 
> {{LeaderInformation}} could substitute any occurrences of 
> {{LeaderConnectionInfo}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260498717


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalGlobalRuntimeFilterBuilder.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter.BatchExecGlobalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkRuntimeFilterProgram;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
+/**
+ * Batch physical RelNode responsible for aggregating all received filters 
into a global filter. See
+ * {@link FlinkRuntimeFilterProgram} for more info.
+ */
+public class BatchPhysicalGlobalRuntimeFilterBuilder extends SingleRel 
implements BatchPhysicalRel {

Review Comment:
   For simplicity, I prefer to do it in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Tartarus0zm commented on pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-07-11 Thread via GitHub


Tartarus0zm commented on PR #22839:
URL: https://github.com/apache/flink/pull/22839#issuecomment-1631766550

   > Please don't forget to create a jira to add a doc for the atomic CTAS 
statement and put the jira and 
[FLINK-32349](https://issues.apache.org/jira/browse/FLINK-32349) under a 
umbrella jira.
   
   @luoyuxia  thanks for your remind


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #22966: [FLINK-32492][table-planner] Introduce FlinkRuntimeFilterProgram to apply the runtime filter optimization.

2023-07-11 Thread via GitHub


wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1260497685


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+
+/** Batch {@link ExecNode} for local runtime filter builder. */
+public class BatchExecLocalRuntimeFilterBuilder extends ExecNodeBase
+implements BatchExecNode {
+private final int[] buildIndices;
+private final int estimatedRowCount;
+private final int maxRowCount;
+
+public BatchExecLocalRuntimeFilterBuilder(
+ReadableConfig tableConfig,
+List inputProperties,
+LogicalType outputType,
+String description,
+int[] buildIndices,
+int estimatedRowCount,
+int maxRowCount) {
+super(
+ExecNodeContext.newNodeId(),
+
ExecNodeContext.newContext(BatchExecLocalRuntimeFilterBuilder.class),
+ExecNodeContext.newPersistedConfig(
+BatchExecLocalRuntimeFilterBuilder.class, tableConfig),
+inputProperties,
+outputType,
+description);
+this.buildIndices = buildIndices;
+this.estimatedRowCount = estimatedRowCount;
+this.maxRowCount = maxRowCount;
+}
+
+@Override
+@SuppressWarnings("unchecked")
+protected Transformation translateToPlanInternal(
+PlannerBase planner, ExecNodeConfig config) {
+ExecEdge inputEdge = getInputEdges().get(0);

Review Comment:
   Fixed



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch.runtimefilter;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;

[jira] [Created] (FLINK-32579) The filter criteria on the lookup table of Lookup join has no effect

2023-07-11 Thread jasonliangyc (Jira)
jasonliangyc created FLINK-32579:


 Summary: The filter criteria on the lookup table of Lookup join 
has no effect 
 Key: FLINK-32579
 URL: https://issues.apache.org/jira/browse/FLINK-32579
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.17.1, 1.17.0
Reporter: jasonliangyc
 Attachments: image-2023-07-12-09-31-18-261.png, 
image-2023-07-12-09-42-59-231.png, image-2023-07-12-09-47-31-397.png

*1.* I joined two tables using the lookup join as below query in sql-client, 
the filter criteria of (p.name = '??') didn't shows up in the execution 
detail and it returned the rows only base on one condiction (cdc.product_id = 
p.id)
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON p.name = '??' and 
cdc.product_id = p.id
; {code}
!image-2023-07-12-09-31-18-261.png|width=657,height=132!

 

*2.* It showed the werid results when i changed the query as below, cause there 
were no data in the table(products) that the value of column 'name' is '??' 
 and and execution detail didn't show us the where criteria.
{code:java}
select
cdc.order_id,
cdc.order_date,
cdc.customer_name,
cdc.price,
p.name
FROM orders AS cdc
left JOIN products 
FOR SYSTEM_TIME AS OF cdc.proc_time as p ON cdc.product_id = p.id
where p.name = '??'
; {code}
!image-2023-07-12-09-42-59-231.png|width=684,height=102!

!image-2023-07-12-09-47-31-397.png|width=685,height=120!

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xuzifu666 commented on pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend

2023-07-11 Thread via GitHub


xuzifu666 commented on PR #22980:
URL: https://github.com/apache/flink/pull/22980#issuecomment-1631732406

   @rkhachatryan had changed format style,thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-07-11 Thread via GitHub


luoyuxia commented on PR #22839:
URL: https://github.com/apache/flink/pull/22839#issuecomment-1631729536

   Please don't forget to create a jira to add a doc for the atomic CTAS 
statement and put the jira and FLINK-32349 under a umbrella jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28460) Flink SQL supports atomic CREATE TABLE AS SELECT(CTAS)

2023-07-11 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia closed FLINK-28460.

Resolution: Fixed

> Flink SQL supports atomic CREATE TABLE AS SELECT(CTAS)
> --
>
> Key: FLINK-28460
> URL: https://issues.apache.org/jira/browse/FLINK-28460
> Project: Flink
>  Issue Type: Sub-task
>Reporter: tartarus
>Assignee: tartarus
>Priority: Major
>
> Enable support for atomic CTAS in stream and batch mode via option.
> Active deletion of the created target table when the job fails or is 
> cancelled, but requires Catalog can be serialized directly via Java 
> Serialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32349) Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-07-11 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-32349.
--
Resolution: Fixed

master:

e6f77bea70682d1f2d708abee75a0dc33de16ee7

> Support atomic for CREATE TABLE AS SELECT(CTAS) statement
> -
>
> Key: FLINK-32349
> URL: https://issues.apache.org/jira/browse/FLINK-32349
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> For detailed information, see FLIP-305
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia merged pull request #22839: [FLINK-32349][table] Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-07-11 Thread via GitHub


luoyuxia merged PR #22839:
URL: https://github.com/apache/flink/pull/22839


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever

2023-07-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-32578:

Component/s: Table SQL / Runtime
 (was: Table SQL / Planner)

> Cascaded group by window time columns on a proctime window aggregate may 
> result hang for ever
> -
>
> Key: FLINK-32578
> URL: https://issues.apache.org/jira/browse/FLINK-32578
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
> Fix For: 1.18.0, 1.17.2
>
>
> Currently when group by window time columns on a proctime window aggregate 
> result will get a wrong plan which may result hang for ever in runtime.
> For such a query:
> {code}
> insert into s1
> SELECT
>   window_start,
>   window_end,
>   sum(cnt),
>   count(*)
> FROM (
>  SELECT
> a,
> b,
> window_start,
> window_end,
> count(*) as cnt,
> sum(d) as sum_d,
> max(d) as max_d
>  FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
>  GROUP BY a, window_start, window_end, b
> )
> GROUP BY a, window_start, window_end
> {code}
> the inner proctime window works fine, but the outer one doesn't work due to a 
> wrong plan which will translate to a unexpected event mode window operator:
> {code}
> Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS 
> c])
>+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) 
> AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
>   +- Exchange(distribution=[hash[a]])
>  +- Calc(select=[a, window_start, window_end, cnt])
> +- WindowAggregate(groupBy=[a, b], 
> window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS 
> cnt, start('w$) AS window_start, end('w$) AS window_end])
>+- Exchange(distribution=[hash[a, b]])
>   +- Calc(select=[a, b, d, PROCTIME() AS proctime])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742098#comment-17742098
 ] 

Martijn Visser edited comment on FLINK-30998 at 7/11/23 11:11 PM:
--

Fixed in:

apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43
apache/flink-connector-opensearch:v1.0 a7f0ade240dbba04a41ae793684ede4285ca959b


was (Author: martijnvisser):
Fixed in:

apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43
apache/flink-connector-opensearch:v1.0 TODO

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Assignee: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.0.2
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-30998.
--
Resolution: Fixed

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Assignee: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.0.2
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] MartijnVisser merged pull request #30: [Backport] [1.0] [FLINK-30998] Apply adding failureHandler on top of current apache:main branch

2023-07-11 Thread via GitHub


MartijnVisser merged PR #30:
URL: https://github.com/apache/flink-connector-opensearch/pull/30


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-11 Thread via GitHub


rkhachatryan commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1260137154


##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -67,10 +70,12 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup {
 private final Meter mailboxThroughput;
 private final Histogram mailboxLatency;
 private final SizeGauge mailboxSize;
+private final Gauge initializingTime;

Review Comment:
   Thanks! 
   I see in the latest update that the type of the field is still gauge and not 
counter. Is it intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22744: [FLINK-29802][state] Changelog supports native savepoint

2023-07-11 Thread via GitHub


rkhachatryan commented on code in PR #22744:
URL: https://github.com/apache/flink/pull/22744#discussion_r1260099896


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -375,6 +378,49 @@ public RunnableFuture> 
snapshot(
 @Nonnull CheckpointStreamFactory streamFactory,
 @Nonnull CheckpointOptions checkpointOptions)
 throws Exception {
+
+if (checkpointOptions.getCheckpointType().isSavepoint()) {
+SnapshotType.SharingFilesStrategy sharingFilesStrategy =
+
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+if (sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING) {
+// For NO_SHARING native savepoint, trigger delegated one
+RunnableFuture> 
delegatedSnapshotResult =
+keyedStateBackend.snapshot(
+checkpointId, timestamp, streamFactory, 
checkpointOptions);

Review Comment:
   I think using `streamFactory` passed to this method is correct because 
savepoint should be stored in a separate path.
   
   I have another question: can't `checkpointId` interfere with 
`materializationId` (used in `initMaterialization`)?
   For example, in local recovery paths.



##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -375,6 +378,49 @@ public RunnableFuture> 
snapshot(
 @Nonnull CheckpointStreamFactory streamFactory,
 @Nonnull CheckpointOptions checkpointOptions)
 throws Exception {
+
+if (checkpointOptions.getCheckpointType().isSavepoint()) {
+SnapshotType.SharingFilesStrategy sharingFilesStrategy =
+
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+if (sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING) {
+// For NO_SHARING native savepoint, trigger delegated one
+RunnableFuture> 
delegatedSnapshotResult =
+keyedStateBackend.snapshot(
+checkpointId, timestamp, streamFactory, 
checkpointOptions);
+return new FutureTask>(
+() -> {
+SnapshotResult result =
+
FutureUtils.runIfNotDoneAndGet(delegatedSnapshotResult);
+return castSnapshotResult(
+buildSnapshotResult(
+checkpointId,
+SnapshotResult.empty(),
+new ChangelogSnapshotState(
+
getMaterializedResult(result;
+}) {
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return 
delegatedSnapshotResult.cancel(mayInterruptIfRunning)
+&& super.cancel(mayInterruptIfRunning);
+}
+
+@Override
+public boolean isCancelled() {
+return delegatedSnapshotResult.isCancelled() && 
super.isCancelled();
+}
+
+@Override
+public boolean isDone() {
+return delegatedSnapshotResult.isDone() && 
super.isDone();
+}
+};
+} else {
+throw new UnsupportedOperationException(

Review Comment:
   :+1: I think you're correct @masteryhx , these are orthogonal, and for 
savepoints we always use `NO_SHARING`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on pull request #30: [Backport] [1.0] [FLINK-30998] Apply adding failureHandler on top of current apache:main branch

2023-07-11 Thread via GitHub


reta commented on PR #30:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/30#issuecomment-1631239695

   > > @MartijnVisser I think we could backport to 1.0, low risk change, thank 
you
   > 
   > Fine with me, will you fix the CI?
   
   Yes!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] afedulov opened a new pull request, #629: [hotfix][docs] Add a 'Build from Source' page

2023-07-11 Thread via GitHub


afedulov opened a new pull request, #629:
URL: https://github.com/apache/flink-kubernetes-operator/pull/629

   
   
   ## What is the purpose of the change
   
   Adds a page with instructions and requirements for building from source 
similar to Flink's 
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / **no**)
 - Core observer or reconciler logic that is regularly executed: (yes / 
**no**)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32348) MongoDB tests are flaky and time out

2023-07-11 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742108#comment-17742108
 ] 

Jiabao Sun commented on FLINK-32348:


[~martijnvisser] Sorry for the late reply.

The root cause of this error is not removing it from readersAwaitingSplit when 
closing an idle reader.
This resulted in splits being incorrectly assigned to readers that did not 
complete when resuming tasks from checkpoints.

1. readersAwaitingSplit: [0]
2. signalNoMoreSplits but not remove 0 from readersAwaitingSplit
3. TaskManager failover
4. split request from reader 1 -> readersAwaitingSplit: [0, 1]
5. but actually assigns split to reader 0.

The PR is ready, could you help review it?

> MongoDB tests are flaky and time out
> 
>
> Key: FLINK-32348
> URL: https://issues.apache.org/jira/browse/FLINK-32348
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Martijn Visser
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://github.com/apache/flink-connector-mongodb/actions/runs/5232649632/jobs/9447519651#step:13:39307



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32348) MongoDB tests are flaky and time out

2023-07-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32348:
---
Labels: pull-request-available test-stability  (was: test-stability)

> MongoDB tests are flaky and time out
> 
>
> Key: FLINK-32348
> URL: https://issues.apache.org/jira/browse/FLINK-32348
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Martijn Visser
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://github.com/apache/flink-connector-mongodb/actions/runs/5232649632/jobs/9447519651#step:13:39307



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend

2023-07-11 Thread via GitHub


rkhachatryan commented on code in PR #22980:
URL: https://github.com/apache/flink/pull/22980#discussion_r1260009895


##
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java:
##
@@ -364,6 +364,7 @@ public Collection getAllHandles() throws Exception {
 return client.getChildren().forPath(path);
 } catch (KeeperException.NoNodeException ignored) {
 // Concurrent deletion, retry
+LOG.debug("Unable to get all handles, retrying (ZNode was 
likely deleted concurrently: {})", ignored.getMessage());

Review Comment:
   I think formatting is broken:
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51183=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=2892
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32563) Allow connectors CI to specify the main supported Flink version

2023-07-11 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742101#comment-17742101
 ] 

Etienne Chauchot commented on FLINK-32563:
--

[~martijnvisser] I had in mind to be slightly more coercive with connector 
authors so that they run CI test on last 2 versions and specify which of the 2 
is main supported one (for running archunit but also other things). I was 
thinking of something like this (in _testing.yml style):
{code:java}
jobs:
  compile_and_test:
strategy:
  matrix:
include:
  - flink: 1.16.2
main_version: false
  - flink: 1.17.1
main_version: true
uses: ./.github/workflows/ci.yml
with:
  connector_branch: ci_utils
  flink_version: ${{ matrix.flink }}
  main_flink_version: ${{ matrix.main_version }}
{code}
{code:java}
inputs:
main_flink_version:
description: "Is the input Flink version, the main version that the 
connector supports."
required: false  // to avoid break the existing connectors
type: boolean
default: false
{code}
Do you prefer something like this ?
{code:java}
jobs:
  enable-archunit-tests:
uses: ./.github/workflows/ci.yml
with:
  flink_version: 1.17.1
  connector_branch: ci_utils
  run_archunit_tests: true
{code}
{code:java}
inputs:
  run_archunit_tests:
description: "Whether to run the archunit tests"
required: false // to avoid break the existing connectors
type: boolean
default: false 
{code}

> Allow connectors CI to specify the main supported Flink version
> ---
>
> Key: FLINK-32563
> URL: https://issues.apache.org/jira/browse/FLINK-32563
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System / CI
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Major
>
> As part of [this 
> discussion|https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3] 
> , the need for connectors to specify the main flink version that a connector 
> supports has arisen. 
> This CI variable will allow to configure the build and tests differently 
> depending on this version. This parameter would be optional.
> The first use case is to run archunit tests only on the main supported 
> version as discussed in the above thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742098#comment-17742098
 ] 

Martijn Visser edited comment on FLINK-30998 at 7/11/23 4:48 PM:
-

Fixed in:

apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43
apache/flink-connector-opensearch:v1.0 TODO


was (Author: martijnvisser):
Fixed in:

apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43
apache/flink-connector-opensearch:v3.0 TODO

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Assignee: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.0.2
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742098#comment-17742098
 ] 

Martijn Visser edited comment on FLINK-30998 at 7/11/23 4:48 PM:
-

Fixed in:

apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43
apache/flink-connector-opensearch:v3.0 TODO


was (Author: martijnvisser):
Fixed in:

apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Assignee: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.0.2
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30998:
---
Fix Version/s: opensearch-1.0.2
   (was: opensearch-1.1.0)

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Assignee: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.0.2
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reopened FLINK-30998:


> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Assignee: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.1.0
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] MartijnVisser commented on pull request #30: [Backport] [1.0] [FLINK-30998] Apply adding failureHandler on top of current apache:main branch

2023-07-11 Thread via GitHub


MartijnVisser commented on PR #30:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/30#issuecomment-1631158945

   > @MartijnVisser I think we could backport to 1.0, low risk change, thank you
   
   Fine with me, will you fix the CI?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] boring-cyborg[bot] commented on pull request #11: [FLINK-30998] Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread via GitHub


boring-cyborg[bot] commented on PR #11:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/11#issuecomment-1631144197

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-30998.
--
Fix Version/s: opensearch-1.1.0
   Resolution: Fixed

Fixed in:

apache/flink-connector-opensearch:main d853e3d6be3e0f15e25c1220800b7d5fcf152c43

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.1.0
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-07-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-30998:
--

Assignee: Leonid Ilyevsky

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Assignee: Leonid Ilyevsky
>Priority: Major
>  Labels: pull-request-available
> Fix For: opensearch-1.1.0
>
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32564) Support cast from BYTES to BIGINT

2023-07-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-32564:
--

Assignee: Hanyu Zheng

> Support cast from BYTES to BIGINT
> -
>
> Key: FLINK-32564
> URL: https://issues.apache.org/jira/browse/FLINK-32564
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> We are dealing with a task that requires casting from the BYTES type to 
> BIGINT. Specifically, we have a string '00T1p'. Our approach is to convert 
> this string to BYTES and then cast the result to BIGINT with the following 
> SQL query:
> {code:java}
> SELECT CAST((CAST('00T1p' as BYTES)) as BIGINT);{code}
> However, an issue arises when executing this query, likely due to an error in 
> the conversion between BYTES and BIGINT. We aim to identify and rectify this 
> issue so our query can run correctly. The tasks involved are:
>  # Investigate and identify the specific reason for the failure of conversion 
> from BYTES to BIGINT.
>  # Design and implement a solution to ensure our query can function correctly.
>  # Test this solution across all required scenarios to guarantee its 
> functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32565) Support cast from BYTES to DOUBLE

2023-07-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-32565:
--

Assignee: Hanyu Zheng

> Support cast from BYTES to DOUBLE
> -
>
> Key: FLINK-32565
> URL: https://issues.apache.org/jira/browse/FLINK-32565
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> We are undertaking a task that requires casting from the BYTES type to 
> DOUBLE. In particular, we have a string '00T1p'. Our current approach is to 
> convert this string to BYTES and then cast the result to DOUBLE using the 
> following SQL query:
> {code:java}
> SELECT CAST((CAST('00T1p' as BYTES)) as DOUBLE);{code}
> {{ }}
> However, we encounter an issue when executing this query, potentially due to 
> an error in the conversion between BYTES and DOUBLE. Our goal is to identify 
> and correct this issue so that our query can execute successfully. The tasks 
> involved are:
>  # Investigate and pinpoint the specific reason for the conversion failure 
> from BYTES to DOUBLE.
>  # Design and implement a solution that enables our query to function 
> correctly.
>  # Test this solution across all required scenarios to ensure its robustness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-07-11 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742095#comment-17742095
 ] 

Jing Ge commented on FLINK-30238:
-

[~ConradJam] would you like to describe the issue you got. We'd like to have 
the most up-to-date status of the issue. Thanks!

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32458) support mixed use of JSON_OBJECTAGG & JSON_ARRAYAGG with other aggregate functions

2023-07-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-32458:
---

Assignee: Yunhong Zheng

> support mixed use of JSON_OBJECTAGG & JSON_ARRAYAGG with other aggregate 
> functions
> --
>
> Key: FLINK-32458
> URL: https://issues.apache.org/jira/browse/FLINK-32458
> Project: Flink
>  Issue Type: Sub-task
>Reporter: lincoln lee
>Assignee: Yunhong Zheng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever

2023-07-11 Thread lincoln lee (Jira)
lincoln lee created FLINK-32578:
---

 Summary: Cascaded group by window time columns on a proctime 
window aggregate may result hang for ever
 Key: FLINK-32578
 URL: https://issues.apache.org/jira/browse/FLINK-32578
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: lincoln lee
Assignee: lincoln lee
 Fix For: 1.18.0, 1.17.2


Currently when group by window time columns on a proctime window aggregate 
result will get a wrong plan which may result hang for ever in runtime.

For such a query:
{code}
insert into s1
SELECT
  window_start,
  window_end,
  sum(cnt),
  count(*)
FROM (
 SELECT
a,
b,
window_start,
window_end,
count(*) as cnt,
sum(d) as sum_d,
max(d) as max_d
 FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
 GROUP BY a, window_start, window_end, b
)
GROUP BY a, window_start, window_end
{code}
the inner proctime window works fine, but the outer one doesn't work due to a 
wrong plan which will translate to a unexpected event mode window operator:
{code}
Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c])
+- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS c])
   +- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) 
AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
  +- Exchange(distribution=[hash[a]])
 +- Calc(select=[a, window_start, window_end, cnt])
+- WindowAggregate(groupBy=[a, b], 
window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS 
cnt, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[a, b]])
  +- Calc(select=[a, b, d, PROCTIME() AS proctime])
 +- TableSourceScan(table=[[default_catalog, 
default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d])
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32501) Wrong execution plan of a proctime window aggregation generated due to incorrect cost evaluation

2023-07-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-32501:

Fix Version/s: (was: 1.17.2)

> Wrong execution plan of a proctime window aggregation generated due to 
> incorrect cost evaluation
> 
>
> Key: FLINK-32501
> URL: https://issues.apache.org/jira/browse/FLINK-32501
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2, 1.17.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently when uses window aggregation referring a windowing tvf with a 
> filter condition, may encounter wrong plan which may hang forever in 
> runtime(the window aggregate operator never output)
> for such a case:
> {code}
> insert into sink
> select
> window_start,
> window_end,
> b,
> COALESCE(sum(case
> when a = 11
> then 1
> end), 0) c
> from
> TABLE(
> TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS)
> )
> where
> a in (1, 5, 7, 9, 11)
> GROUP BY
> window_start, window_end, b
> {code}
> generate wrong plan which didn't combine the proctime WindowTableFunction 
> into WindowAggregate (so when translate to execution plan the WindowAggregate 
> will wrongly recognize the window as an event-time window, then the 
> WindowAggregateOperator will not receive watermark nor setup timers to fire 
> any windows in runtime)
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>+- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) 
> AS window_start, end('w$) AS window_end])
>   +- Exchange(distribution=[hash[b]])
>  +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, 
> null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], 
> size=[10 s])])
>+- Calc(select=[a, b, PROCTIME() AS proctime])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}
> expected plan:
> {code}
> Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
>+- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], 
> size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, 
> end('w$) AS window_end])
>   +- Exchange(distribution=[hash[b]])
>  +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, 
> PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
> +- TableSourceScan(table=[[default_catalog, default_database, 
> source, project=[a, b], metadata=[]]], fields=[a, b])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22981: [FLINK-28743][table-planner] Supports validating the determinism for StreamPhysicalMatchRecognize

2023-07-11 Thread via GitHub


flinkbot commented on PR #22981:
URL: https://github.com/apache/flink/pull/22981#issuecomment-1630953554

   
   ## CI report:
   
   * 4a09b912e7aaa9acdc874b6232551c9b63fa188f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize

2023-07-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28743:
---
Labels: pull-request-available  (was: )

> Support validating the determinism for StreamPhysicalMatchRecognize
> ---
>
> Key: FLINK-28743
> URL: https://issues.apache.org/jira/browse/FLINK-28743
> Project: Flink
>  Issue Type: Sub-task
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> MatchRecognize has complex expressions and is not commonly used in 
> traditional SQLs, so mark this as a minor issue (for 1.16)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lincoln-lil opened a new pull request, #22981: [FLINK-28743][table-planner] Supports validating the determinism for StreamPhysicalMatchRecognize

2023-07-11 Thread via GitHub


lincoln-lil opened a new pull request, #22981:
URL: https://github.com/apache/flink/pull/22981

   ## What is the purpose of the change
   This is an minor addition to the FLINK-27849 implementation to support the 
analysis of the `MatchRecognize` node.
   Since the `MatchRecognize` will not support updating inputs in the 
foreseeable future, here we keep the analytic logic as a relatively simple one.
   
   ## Brief change log
   add the analytic logic of `MatchRecognize` node in 
`StreamNonDeterministicUpdatePlanVisitor`
   
   ## Verifying this change
   added test cases into `NonDeterministicDagTest`
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
@Public(Evolving): (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize

2023-07-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-28743:
---

Assignee: lincoln lee

> Support validating the determinism for StreamPhysicalMatchRecognize
> ---
>
> Key: FLINK-28743
> URL: https://issues.apache.org/jira/browse/FLINK-28743
> Project: Flink
>  Issue Type: Sub-task
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Minor
> Fix For: 1.18.0
>
>
> MatchRecognize has complex expressions and is not commonly used in 
> traditional SQLs, so mark this as a minor issue (for 1.16)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #22932: [FLINK-26549][table][tests] Add tests for INSERT INTO with VALUES leads to wrong type inference with nested types

2023-07-11 Thread via GitHub


XComp commented on code in PR #22932:
URL: https://github.com/apache/flink/pull/22932#discussion_r1259561126


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.java:
##
@@ -166,6 +172,48 @@ void testCanonizeType() {
 .isNotEqualTo(typeFactory.builder().add("f0", 
genericRelType3).build());
 }
 
+static Stream testLeastRestrictive() {
+return Stream.of(

Review Comment:
   Could you add a comment to the code on why these different cases were 
selected for the test?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/InsertIntoValuesTest.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+/** Plan test for INSERT INTO. */
+public class InsertIntoValuesTest extends TableTestBase {
+private final JavaStreamTableTestUtil util = javaStreamTestUtil();
+
+@Test
+public void testTypeInferenceWithNestedTypes() {
+util.tableEnv()
+.executeSql(
+"CREATE TABLE t1 ("
++ "  `mapWithStrings` MAP,"
++ "  `mapWithBytes` MAP"
++ ") WITH ("
++ "   'connector' = 'values'"
++ ")");
+
+util.verifyExecPlanInsert(
+"INSERT INTO t1 VALUES "
++ "(MAP['a', '123', 'b', '123456'], MAP['k1', 
X'C0FFEE', 'k2', X'BABE']), "
++ "(CAST(NULL AS MAP), CAST(NULL AS 
MAP)), "
++ "(MAP['a', '1', 'b', '1'], MAP['k1', X'10', 'k2', 
X'20'])");

Review Comment:
   Here it might be usefule to add a comment that Calcite 1.26 relied on the 
last value to derive the type. That way, readers of the code would understand 
why you decided on this specific test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on pull request #39: [FLINK-32453] Make main and v3.0 branch build against Flink 1.18-SNAPSHOT

2023-07-11 Thread via GitHub


tzulitai commented on PR #39:
URL: 
https://github.com/apache/flink-connector-kafka/pull/39#issuecomment-1630892082

   @XComp there's isn't a Kafka-connector specific release documentation, as 
far as I'm aware of. So far, the generic doc was good enough at least for the 
first `v3.0,0` release of the connector. Adding steps for generating the 
snapshot test files to the readme sounds good for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32560) Properly deprecate all Scala APIs

2023-07-11 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742015#comment-17742015
 ] 

Ryan Skraba commented on FLINK-32560:
-

OK -- taking a quick look at the current work, I'll take a look at all modules 
outside of table-planner and add the following annotation and comment *every 
single time*:

{code}
/**
 * @deprecated
 *   All Flink Scala APIs are deprecated and will be removed in a future Flink 
version version. You
 *   can still build your application in Scala, but you should move to the Java 
version of either
 *   the DataStream and/or Table API.
 * @see
 *   https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support;>
 *   FLIP-265 Deprecate and remove Scala API support
 */
@Deprecated
{code}

There's also some {{@Public}} scala code in flink-hadoop-compatibility, and 
even if {{table-planner}} isn't in scope yet, the {{table-table-api-scala}} and 
{{table-table-api-scala-bridge}} are likely also candidates for clean-up.

> Properly deprecate all Scala APIs
> -
>
> Key: FLINK-32560
> URL: https://issues.apache.org/jira/browse/FLINK-32560
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Scala
>Reporter: Xintong Song
>Assignee: Ryan Skraba
>Priority: Blocker
> Fix For: 1.18.0
>
>
> We agreed to drop Scala API support in FLIP-265 [1], and have tried to 
> deprecate them in FLINK-29740. Also, both user documentation and roadmap[2] 
> shows that scala API supports are deprecated. However, none of the APIs in 
> `flink-streaming-scala` are annotated with `@Deprecated` atm, and only 
> `ExecutionEnvironment` and `package` are marked `@Deprecated` in 
> `flink-scala`.
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
> [2] https://flink.apache.org/roadmap/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-11 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741999#comment-17741999
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

[~renqs] I'm merging my hotfix PR now which should unblock things for the time 
being. The PR has already been reviewed and approved by [~gaoyunhaii].

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-11 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741997#comment-17741997
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

[~gaoyunhaii] that plan sounds good to me. Are you planning to steps 1. and 2. 
already for 1.18? If yes I can probably revert the "quick" fix PR I did in the 
Flink Kafka connector repo.

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] tzulitai commented on pull request #39: [FLINK-32453] Make main and v3.0 branch build against Flink 1.18-SNAPSHOT

2023-07-11 Thread via GitHub


tzulitai commented on PR #39:
URL: 
https://github.com/apache/flink-connector-kafka/pull/39#issuecomment-1630788865

   re: @XComp 
   
   > Would it make sense to create a new testbase class for connectors in 
flink-connector-common that implements the test data generation and mark it as 
@PublicEvolving? To make it more obvious that it's used in the external 
repositories?
   
   I was thinking along the same lines, although it should just be a 
public-facing test utility that Flink distributes. I can imagine any user 
implementing implementing their own `TypeSerializer` who might want to write 
migration tests for them using the base class.
   
   > Additionally, where is the release documentation kept for the Kafka 
connector.
   
   There's this: 
https://cwiki.apache.org/confluence/display/FLINK/Creating+a+flink-connector+release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2023-07-11 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741989#comment-17741989
 ] 

Ahmed Hamdy commented on FLINK-27756:
-

[~Sergey Nuyanzin]thanks for reopening, I will try to take a look at it this 
week.

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.0, 1.17.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.0
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] tzulitai commented on pull request #7: [FLINK-31408] Add support for EOS delivery-guarantee in upsert-kafka

2023-07-11 Thread via GitHub


tzulitai commented on PR #7:
URL: 
https://github.com/apache/flink-connector-kafka/pull/7#issuecomment-1630767966

   @Ge as discussed offline, I think there really isn't a better way around 
the current PR approach. I'll address the comments on the doc and proceed to 
merge this.
   
   Thank you for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28743) Support validating the determinism for StreamPhysicalMatchRecognize

2023-07-11 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741983#comment-17741983
 ] 

lincoln lee commented on FLINK-28743:
-

[~twalthr] Sorry for the late reply. Yes, I also confirmed with [~dianfu] 
offline that the MatchRecognize won't support updating inputs in the 
foreseeable future either, so the analytic support for this operator can be 
relatively simple, and I'm working on a pr that should be ready in time for the 
1.18 release.

> Support validating the determinism for StreamPhysicalMatchRecognize
> ---
>
> Key: FLINK-28743
> URL: https://issues.apache.org/jira/browse/FLINK-28743
> Project: Flink
>  Issue Type: Sub-task
>Reporter: lincoln lee
>Priority: Minor
> Fix For: 1.18.0
>
>
> MatchRecognize has complex expressions and is not commonly used in 
> traditional SQLs, so mark this as a minor issue (for 1.16)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32528) The RexCall a = a,if a's datatype is nullable, and when a is null, a = a is null, it isn't true in BinaryComparisonExprReducer

2023-07-11 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741974#comment-17741974
 ] 

Yunhong Zheng commented on FLINK-32528:
---

Hi, [~shenlang] , can you provide a specific sql query as example?  I don't 
quite understand this 'if a's datatype is nullable, and when a is null, a = a 
is null, a <> a is null'. Thank you!

> The RexCall a = a,if a's datatype is nullable, and when a is null, a = a is 
> null, it isn't true in BinaryComparisonExprReducer
> --
>
> Key: FLINK-32528
> URL: https://issues.apache.org/jira/browse/FLINK-32528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: LakeShen
>Priority: Major
>
> Now I'am reading flink sql planner's source code,when I saw the 
> FlinkRexUtil.java, in the 
> org.apache.flink.table.planner.plan.utils.FlinkRexUtil#simplify method,it 
> used the BinaryComparisonExprReducer the deal with BINARY_COMPARISON's 
> operator which the operands are RexInputRef and the operands are same, e.g. a 
> = a, a <> a,a >=a...
> In BinaryComparisonExprReducer, a = a,a <=a,a>=a will be simplified the true 
> literal,a <> a,a < a, a > a will be simplified the false literal.
> if a's datatype is nullable, and when a is null, a = a is null, a <> a is 
> null. In BinaryComparisonExprReducer's logic,It does not consider the case of 
> the nullable data type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32560) Properly deprecate all Scala APIs

2023-07-11 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741973#comment-17741973
 ] 

Xintong Song commented on FLINK-32560:
--

The help is very much welcomed. Thanks [~rskraba] and [~Sergey Nuyanzin].

> Properly deprecate all Scala APIs
> -
>
> Key: FLINK-32560
> URL: https://issues.apache.org/jira/browse/FLINK-32560
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Scala
>Reporter: Xintong Song
>Assignee: Ryan Skraba
>Priority: Blocker
> Fix For: 1.18.0
>
>
> We agreed to drop Scala API support in FLIP-265 [1], and have tried to 
> deprecate them in FLINK-29740. Also, both user documentation and roadmap[2] 
> shows that scala API supports are deprecated. However, none of the APIs in 
> `flink-streaming-scala` are annotated with `@Deprecated` atm, and only 
> `ExecutionEnvironment` and `package` are marked `@Deprecated` in 
> `flink-scala`.
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
> [2] https://flink.apache.org/roadmap/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-07-11 Thread via GitHub


zentol commented on code in PR #22850:
URL: https://github.com/apache/flink/pull/22850#discussion_r1259659718


##
flink-connectors/flink-connector-datagen-test/pom.xml:
##
@@ -0,0 +1,89 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.18-SNAPSHOT
+   
+
+   flink-connector-datagen-tests

Review Comment:
   > Would you like to see this refactoring integrated in order to minimize the 
dependencies surface regardless of that? If so, I'll open a separate ticket and 
a PR.
   
   Should be a separate ticket :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-07-11 Thread via GitHub


afedulov commented on code in PR #22850:
URL: https://github.com/apache/flink/pull/22850#discussion_r1259655490


##
flink-connectors/flink-connector-datagen-test/pom.xml:
##
@@ -0,0 +1,89 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.18-SNAPSHOT
+   
+
+   flink-connector-datagen-tests

Review Comment:
   Small question: I already prepared the rework of `flink-architecture-tests` 
towards using the fully-qualified names instead of these somewhat redundant 
dependencies for class objects. If we go for option (3) we do not immediately 
need it here. Would you like to see this refactoring integrated in order to 
minimize the dependencies surface regardless of that? If so, I'll open a 
separate ticket and a PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-07-11 Thread via GitHub


afedulov commented on code in PR #22850:
URL: https://github.com/apache/flink/pull/22850#discussion_r1259655490


##
flink-connectors/flink-connector-datagen-test/pom.xml:
##
@@ -0,0 +1,89 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.18-SNAPSHOT
+   
+
+   flink-connector-datagen-tests

Review Comment:
   Small question: I already prepared the rework of `flink-architecture-tests` 
towards using the fully-qualified names instead of these somewhat redundant 
dependencies for class objects. If we go for option (3) we do not immediately 
need it here. Would you like to see this refactoring integrated in order to 
minimize the dependencies surface regardless? If so, I'll open a separate 
ticket and a PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-07-11 Thread via GitHub


afedulov commented on code in PR #22850:
URL: https://github.com/apache/flink/pull/22850#discussion_r1259655490


##
flink-connectors/flink-connector-datagen-test/pom.xml:
##
@@ -0,0 +1,89 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.18-SNAPSHOT
+   
+
+   flink-connector-datagen-tests

Review Comment:
   Small question: I already prepared the rework of `flink-architecture-tests` 
towards using the fully-qualified names instead of these somewhat redundant 
dependencies for class objects. If we go for option (3) we do not immediately 
need it here. Would you like to see this refactoring integrated in order to 
minimize the dependencies surface nonetheless? If so, I'll open a separate 
ticket and a PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse

2023-07-11 Thread via GitHub


WenDing-Y commented on PR #49:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1630724942

   > > One question that comes to my mind now.. This will only work with a 1 
server no? if we have a Clickhouse cluster with more than 1 server (that will 
have a zookeeper envolved) will not work no?
   > > if this is true, I think we should have a note on documentation
   > 
   > I tried to work normally in cluster mode
   > 
   > My steps are as follows 1.create ck table
   > 
   > ```
   > create table if not exists default.ck_flink_test_local on cluster 
clicks_cluster
   > (
   > user_id Int32,
   > message String
   > )
   > engine = 
ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', 
'{replica}')
   > PRIMARY KEY (user_id)
   > 
   > 
   > 
   > 
   > create table if not exists default.ck_flink_test on cluster clicks_cluster
   > (
   > user_id Int32,
   > message String
   > )
   > engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', 
user_id);
   > ```
   > 
   > 2. in flink sql client
   > 
   > ```
   > 
   > CREATE TABLE ck_flink_test(
   >user_id INTEGER,
   >   message String
   > ) WITH (
   >   'connector' = 'jdbc',
   >   'url' = 'jdbc:clickhouse://xx:xx/default',
   >  'table-name' = 'ck_flink_test',
   >  'username’='xx',
   >  'password’='xx'
   > );
   > ```
   > 
   > 3. exec insert sql
   > 
   > ```
   > INSERT INTO ck_flink_test (user_id, message) VALUES
   > (101, 'Hello');
   > ```
   > 
   > 4. thd data can be viewed on the ck client
   > 
   > ```
   > select * from ck_flink_test;
   > ```
   > 
   > 5.the flink sql client
   > 
   > 
![image](https://user-images.githubusercontent.com/19184146/252660687-89eca20f-eabf-4090-9460-c46716653afc.png)
   
   
   @eskabetxe I guess you might have defined the table incorrectly
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] WenDing-Y closed pull request #49: [FLINK-32068] connector jdbc support clickhouse

2023-07-11 Thread via GitHub


WenDing-Y closed pull request #49: [FLINK-32068] connector jdbc support 
clickhouse
URL: https://github.com/apache/flink-connector-jdbc/pull/49


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse

2023-07-11 Thread via GitHub


WenDing-Y commented on PR #49:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1630722843

   > > One question that comes to my mind now.. This will only work with a 1 
server no? if we have a Clickhouse cluster with more than 1 server (that will 
have a zookeeper envolved) will not work no?
   > > if this is true, I think we should have a note on documentation
   > 
   > I tried to work normally in cluster mode
   > 
   > My steps are as follows 1.create ck table
   > 
   > ```
   > create table if not exists default.ck_flink_test_local on cluster 
clicks_cluster
   > (
   > user_id Int32,
   > message String
   > )
   > engine = 
ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', 
'{replica}')
   > PRIMARY KEY (user_id)
   > 
   > 
   > 
   > 
   > create table if not exists default.ck_flink_test on cluster clicks_cluster
   > (
   > user_id Int32,
   > message String
   > )
   > engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', 
user_id);
   > ```
   > 
   > 2. in flink sql client
   > 
   > ```
   > 
   > CREATE TABLE ck_flink_test(
   >user_id INTEGER,
   >   message String
   > ) WITH (
   >   'connector' = 'jdbc',
   >   'url' = 'jdbc:clickhouse://xx:xx/default',
   >  'table-name' = 'ck_flink_test',
   >  'username’='xx',
   >  'password’='xx'
   > );
   > ```
   > 
   > 3. exec insert sql
   > 
   > ```
   > INSERT INTO ck_flink_test (user_id, message) VALUES
   > (101, 'Hello');
   > ```
   > 
   > 4. thd data can be viewed on the ck client
   > 
   > ```
   > select * from ck_flink_test;
   > ```
   > 
   > 5.the flink sql client
   > 
   > 
![image](https://user-images.githubusercontent.com/19184146/252660687-89eca20f-eabf-4090-9460-c46716653afc.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse

2023-07-11 Thread via GitHub


WenDing-Y commented on PR #49:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1630721467

   > One question that comes to my mind now.. This will only work with a 1 
server no? if we have a Clickhouse cluster with more than 1 server (that will 
have a zookeeper envolved) will not work no?
   > 
   > if this is true, I think we should have a note on documentation
   
   I tried to work normally in cluster mode
   
   
   
   My steps are as follows
   1.create ck table 
   ```
   create table if not exists default.ck_flink_test_local on cluster 
clicks_cluster
   (
   user_id Int32,
   message String
   )
   engine = 
ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', 
'{replica}')
   PRIMARY KEY (user_id)
   
   
   
   
   create table if not exists default.ck_flink_test on cluster clicks_cluster
   (
   user_id Int32,
   message String
   )
   engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', 
user_id);
   ```
   
   2. in flink sql client 
   
   ```
   
   CREATE TABLE ck_flink_test(
  user_id INTEGER,
 message String
   ) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:clickhouse://xx:xx/default',
'table-name' = 'ck_flink_test',
'username’='xx',
'password’='xx'
   );
   
   
   ```
   3. exec insert sql
   
   ```
   INSERT INTO ck_flink_test (user_id, message) VALUES
   (101, 'Hello');
   
   
   
   ```
   
   4. thd data can be viewed on the ck client 
   
   ```
   select * from ck_flink_test;
   ```
   5.the flink sql client 
   
   
![image](https://github.com/apache/flink-connector-jdbc/assets/19184146/89eca20f-eabf-4090-9460-c46716653afc)
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #22850: [FLINK-28229][streaming-java] Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-07-11 Thread via GitHub


afedulov commented on code in PR #22850:
URL: https://github.com/apache/flink/pull/22850#discussion_r1259648022


##
flink-connectors/flink-connector-datagen-test/pom.xml:
##
@@ -0,0 +1,89 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.18-SNAPSHOT
+   
+
+   flink-connector-datagen-tests

Review Comment:
   Perfect, thanks for you input. The "not great, not terrible (c)" option it 
is then, I also had a slight preference towards it. I'll add comments to the 
pom to make it clear why this module exists.



##
flink-connectors/flink-connector-datagen-test/pom.xml:
##
@@ -0,0 +1,89 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.18-SNAPSHOT
+   
+
+   flink-connector-datagen-tests

Review Comment:
   Perfect, thanks for your input. The "not great, not terrible (c)" option it 
is then, I also had a slight preference towards it. I'll add comments to the 
pom to make it clear why this module exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh closed pull request #22859: [FLINK-32385][runtime] Introduce SerializedShuffleDescriptorAndIndices to identify a group of ShuffleDescriptorAndIndex

2023-07-11 Thread via GitHub


huwh closed pull request #22859: [FLINK-32385][runtime] Introduce 
SerializedShuffleDescriptorAndIndices to identify a group of 
ShuffleDescriptorAndIndex
URL: https://github.com/apache/flink/pull/22859


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh closed pull request #22860: [FLINK-32386][runtime] Introduce ShuffleDescriptorsCache to cache ShuffleDescriptorAndIndex

2023-07-11 Thread via GitHub


huwh closed pull request #22860: [FLINK-32386][runtime] Introduce 
ShuffleDescriptorsCache to cache ShuffleDescriptorAndIndex
URL: https://github.com/apache/flink/pull/22860


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzifu666 closed pull request #22021: [Hotfix] typo hotfix in TestingDispatcher

2023-07-11 Thread via GitHub


xuzifu666 closed pull request #22021: [Hotfix] typo hotfix in TestingDispatcher 
URL: https://github.com/apache/flink/pull/22021


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzifu666 commented on pull request #22980: [FLINK-31035] add warn info to user when NoNodeException happend

2023-07-11 Thread via GitHub


xuzifu666 commented on PR #22980:
URL: https://github.com/apache/flink/pull/22980#issuecomment-1630694842

   > Thanks for the improvement, could you squash all commits and rebase this 
to latest master branch.
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32577) Avoid memory fragmentation when running CI for flink-table-planner module

2023-07-11 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-32577:
--
Description: This issue is a sub-issue of FLINK-18356.  (was: This issue is 
a sub-issue of FLINK-18356.

When I run mvn verify for flink table-planner in azure CI and my own machine.  
I found that the heap memory and non-heap memory of JVM are stable and within 
the normal range. However, the total memory usage ({*}RES{*}) of the fork 
process is very high, as shown in the following figure(PID : 2958793 and 
2958794):

!image-2023-07-11-19-28-52-851.png|width=537,height=245!

I try to delve deeper into the specific memory allocation of these two 
processes:

 
{code:java}
pmap -p 2958793 {code}
I found that there are a lot of memory fragmentation here with a size close to 
*64MB* (>200 memory fragmentation):

 

!image-2023-07-11-19-35-54-530.png|width=237,height=413!

Based on past experience, this issue is likely to trigger the classic problem 
of the incorrect memory fragmentation manage by *glibc of JDK8.* So we 
downloaded *libjemalloc* and added the environment variable:

 
{code:java}
export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code}
After that, the overall memory of the fork process has become stable and meets 
expectations (5GB):

 

!image-2023-07-11-19-41-18-626.png|width=488,height=208!

!image-2023-07-11-19-41-37-105.png|width=228,height=287!

The solution to this problem requires modifying the CI execution Docker image 
[Docker image|[https://github.com/flink-ci/flink-ci-docker],]  replacing 
*glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{{*}}
{code:java}
apt-get -y install libjemalloc-dev

ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code}
I have opened a new Jira (FLINK-32577) to track and fix this issue. cc 
[~mapohl]  [~jark].

 )

> Avoid memory fragmentation when running CI for flink-table-planner module
> -
>
> Key: FLINK-32577
> URL: https://issues.apache.org/jira/browse/FLINK-32577
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Priority: Major
> Fix For: 1.18.0
>
>
> This issue is a sub-issue of FLINK-18356.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong commented on a diff in pull request #22975: [FLINK-31643][network] Introduce the configurations for tiered storage

2023-07-11 Thread via GitHub


xintongsong commented on code in PR #22975:
URL: https://github.com/apache/flink/pull/22975#discussion_r1259626675


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##
@@ -49,7 +55,9 @@ public class NettyShuffleMaster implements 
ShuffleMaster
 
 private final int networkBufferSize;
 
-// TODO, WIP: create tiered internal shuffle master after enabling the 
tiered storage.
+@Nullable private TieredInternalShuffleMaster tieredInternalShuffleMaster;
+
+@Nullable private TieredStorageConfiguration tieredStorageConfiguration;

Review Comment:
   Should be final.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##
@@ -40,7 +41,9 @@ public class TieredInternalShuffleMaster {
 
 public TieredInternalShuffleMaster(Configuration conf) {
 TieredStorageConfiguration tieredStorageConfiguration =
-TieredStorageConfiguration.fromConfiguration(conf);
+TieredStorageConfiguration.builder(
+
conf.getString(NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH))
+.build();

Review Comment:
   This change seems belonging to the first commit. Otherwise, there will be a 
compiling error.



##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions {
 + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
 + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+@Experimental
+@Internal
+public static final ConfigOption 
NETWORK_HYBRID_SHUFFLE_ENABLE_TIERED_STORAGE =
+
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-tiered-storage")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"The option is used to enable tiered storage 
architecture for hybrid shuffle mode.");
+
+@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+@Experimental
+@Internal
+public static final ConfigOption 
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+key("taskmanager.network.hybrid-shuffle.remote.path")
+.stringType()
+.noDefaultValue()
+.withDescription(
+"The base home path of remote storage for remote 
tier. If the option is configured, "
++ "Hybrid Shuffle will use the remote 
storage path as a supplement to the"
++ "local disks. If not configured, the 
remote storage will not be used.");

Review Comment:
   1. This is not internal
   2. JavaDoc is missing
   3. Shall we expose the concept `remote tier` to users at all? What are the 
minimum necessary things that user needs to understand?



##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions {
 + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
 + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+@Experimental
+@Internal
+public static final ConfigOption 
NETWORK_HYBRID_SHUFFLE_ENABLE_TIERED_STORAGE =
+
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-tiered-storage")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"The option is used to enable tiered storage 
architecture for hybrid shuffle mode.");

Review Comment:
   1. This is not internal.
   2. JavaDoc is missing.
   3. What is "tiered storage architecture" from the user's perspective? How is 
it different from the original architecture? What does the user needs to know 
about this?
   4. Why is the default value `false`? How can a user decide which mode to 
use? Would there be anyone enable it at all?



##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions {
 + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the 

[jira] [Updated] (FLINK-32577) Avoid memory fragmentation when running CI for flink-table-planner module

2023-07-11 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-32577:
--
Description: 
This issue is a sub-issue of FLINK-18356.

When I run mvn verify for flink table-planner in azure CI and my own machine.  
I found that the heap memory and non-heap memory of JVM are stable and within 
the normal range. However, the total memory usage ({*}RES{*}) of the fork 
process is very high, as shown in the following figure(PID : 2958793 and 
2958794):

!image-2023-07-11-19-28-52-851.png|width=537,height=245!

I try to delve deeper into the specific memory allocation of these two 
processes:

 
{code:java}
pmap -p 2958793 {code}
I found that there are a lot of memory fragmentation here with a size close to 
*64MB* (>200 memory fragmentation):

 

!image-2023-07-11-19-35-54-530.png|width=237,height=413!

Based on past experience, this issue is likely to trigger the classic problem 
of the incorrect memory fragmentation manage by *glibc of JDK8.* So we 
downloaded *libjemalloc* and added the environment variable:

 
{code:java}
export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code}
After that, the overall memory of the fork process has become stable and meets 
expectations (5GB):

 

!image-2023-07-11-19-41-18-626.png|width=488,height=208!

!image-2023-07-11-19-41-37-105.png|width=228,height=287!

The solution to this problem requires modifying the CI execution Docker image 
[Docker image|[https://github.com/flink-ci/flink-ci-docker],]  replacing 
*glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{{*}}
{code:java}
apt-get -y install libjemalloc-dev

ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code}
I have opened a new Jira (FLINK-32577) to track and fix this issue. cc 
[~mapohl]  [~jark].

 

> Avoid memory fragmentation when running CI for flink-table-planner module
> -
>
> Key: FLINK-32577
> URL: https://issues.apache.org/jira/browse/FLINK-32577
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Priority: Major
> Fix For: 1.18.0
>
>
> This issue is a sub-issue of FLINK-18356.
> When I run mvn verify for flink table-planner in azure CI and my own machine. 
>  I found that the heap memory and non-heap memory of JVM are stable and 
> within the normal range. However, the total memory usage ({*}RES{*}) of the 
> fork process is very high, as shown in the following figure(PID : 2958793 and 
> 2958794):
> !image-2023-07-11-19-28-52-851.png|width=537,height=245!
> I try to delve deeper into the specific memory allocation of these two 
> processes:
>  
> {code:java}
> pmap -p 2958793 {code}
> I found that there are a lot of memory fragmentation here with a size close 
> to *64MB* (>200 memory fragmentation):
>  
> !image-2023-07-11-19-35-54-530.png|width=237,height=413!
> Based on past experience, this issue is likely to trigger the classic problem 
> of the incorrect memory fragmentation manage by *glibc of JDK8.* So we 
> downloaded *libjemalloc* and added the environment variable:
>  
> {code:java}
> export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code}
> After that, the overall memory of the fork process has become stable and 
> meets expectations (5GB):
>  
> !image-2023-07-11-19-41-18-626.png|width=488,height=208!
> !image-2023-07-11-19-41-37-105.png|width=228,height=287!
> The solution to this problem requires modifying the CI execution Docker image 
> [Docker image|[https://github.com/flink-ci/flink-ci-docker],]  replacing 
> *glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{{*}}
> {code:java}
> apt-get -y install libjemalloc-dev
> ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code}
> I have opened a new Jira (FLINK-32577) to track and fix this issue. cc 
> [~mapohl]  [~jark].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-18356) flink-table-planner Exit code 137 returned from process

2023-07-11 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741961#comment-17741961
 ] 

Yunhong Zheng edited comment on FLINK-18356 at 7/11/23 11:55 AM:
-

Hi, all. I think I found the root cause of table-planner exit 137 error under 
the guidance of [~lincoln.86xy] .  This error is similar to  issue FLINK-19125, 
both are caused by the incorrect memory fragmentation manage by {*}glibc{*}, 
which will not return memory to kernel gracefully. (refer to [glibc 
bugzilla|https://sourceware.org/bugzilla/show_bug.cgi?id=15321] and [glibc 
manual|https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc]).

When I run mvn verify for flink table-planner in azure CI and my own machine.  
I found that the heap memory and non-heap memory of JVM are stable and within 
the normal range. However, the total memory usage ({*}RES{*}) of the fork 
process is very high, as shown in the following figure(PID : 2958793 and 
2958794):

!image-2023-07-11-19-28-52-851.png|width=537,height=245!

I try to delve deeper into the specific memory allocation of these two 
processes:

 
{code:java}
pmap -p 2958793 {code}
I found that there are a lot of memory fragmentation here with a size close to 
*64MB* (>200 memory fragmentation):

 

!image-2023-07-11-19-35-54-530.png|width=237,height=413!

Based on past experience, this issue is likely to trigger the classic problem 
of the incorrect memory fragmentation manage by *glibc of JDK8.* So we 
downloaded *libjemalloc* and added the environment variable:

 
{code:java}
export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code}
After that, the overall memory of the fork process has become stable and meets 
expectations (5GB):

 

!image-2023-07-11-19-41-18-626.png|width=488,height=208!

!image-2023-07-11-19-41-37-105.png|width=228,height=287!

The solution to this problem requires modifying the CI execution Docker image 
[Docker image|[https://github.com/flink-ci/flink-ci-docker],]  replacing 
*glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] .
{code:java}
apt-get -y install libjemalloc-dev

ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code}
I have opened a new Jira (FLINK-32577) to track and fix this issue. cc 
[~mapohl]  [~jark].

 


was (Author: JIRAUSER287975):
Hi, all. I think I found the root cause of table-planner exit 137 error under 
the guidance of [~lincoln.86xy] .  This error is similar to  issue 
[FLINK-19125|https://issues.apache.org/jira/browse/FLINK-19125], both are 
caused by the incorrect memory fragmentation manage by {*}glibc{*}, which will 
not return memory to kernel gracefully. (refer to [glibc 
bugzilla|https://sourceware.org/bugzilla/show_bug.cgi?id=15321] and [glibc 
manual|https://www.gnu.org/software/libc/manual/html_mono/libc.html#Freeing-after-Malloc]).

When I run mvn verify for flink table-planner in azure CI and my own machine.  
I found that the heap memory and non-heap memory of JVM are stable and within 
the normal range. However, the total memory usage ({*}RES{*}) of the fork 
process is very high, as shown in the following figure(PID : 2958793 and 
2958794):

!image-2023-07-11-19-28-52-851.png|width=537,height=245!

I try to delve deeper into the specific memory allocation of these two 
processes:

 
{code:java}
pmap -p 2958793 {code}
I found that there are a lot of memory fragmentation here with a size close to 
*64MB* (>200 memory fragmentation):

 

!image-2023-07-11-19-35-54-530.png|width=237,height=413!

Based on past experience, this issue is likely to trigger the classic problem 
of the incorrect memory fragmentation manage by *glibc of JDK8.* So we 
downloaded *libjemalloc* and added the environment variable:

 
{code:java}
export LD_PRELOAD=${JAVA_HOME}/lib/amd64/libjemalloc.so.2{code}
After that, the overall memory of the fork process has become stable and meets 
expectations (5GB):

 

!image-2023-07-11-19-41-18-626.png|width=488,height=208!

!image-2023-07-11-19-41-37-105.png|width=228,height=287!

The solution to this problem requires modifying the CI execution Docker image 
[Docker image|[https://github.com/flink-ci/flink-ci-docker],]  replacing 
*glibc* with *libjemalloc* like FLINK-19125, cc [~chesnay] :{*}{*}
{code:java}
apt-get -y install libjemalloc-dev

ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so {code}
I have opened a new Jira (FLINK-32577) to track and fix this issue. cc 
[~mapohl]  [~jark].

 

> flink-table-planner Exit code 137 returned from process
> ---
>
> Key: FLINK-18356
> URL: https://issues.apache.org/jira/browse/FLINK-18356
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Tests
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0
>Reporter: 

  1   2   >