[jira] [Assigned] (FLINK-34258) Incorrect example of accumulator usage within emitUpdateWithRetract for TableAggregateFunction

2024-01-28 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-34258:
-

Assignee: (was: Jane Chan)

> Incorrect example of accumulator usage within emitUpdateWithRetract for 
> TableAggregateFunction
> --
>
> Key: FLINK-34258
> URL: https://issues.apache.org/jira/browse/FLINK-34258
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Jane Chan
>Priority: Minor
>
> The 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#retraction-example]
>  provides an example of using `emitUpdateWithRetract`. However, the example 
> is misleading as it incorrectly suggests that the accumulator can be updated 
> within the `emitUpdateWithRetract method`. In reality, the order of 
> invocation is to first call `getAccumulator` and then 
> `emitUpdateWithRetract`, which means that updating the accumulator within 
> `emitUpdateWithRetract` will not take effect. Please see 
> [GroupTableAggFunction#L141|https://github.com/apache/flink/blob/20450485b20cb213b96318b0c3275e42c0300e15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java#L141]
>  ~ 
> [GroupTableAggFunction#L146|https://github.com/apache/flink/blob/20450485b20cb213b96318b0c3275e42c0300e15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java#L146]
>  for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34052) Missing TopSpeedWindowing and SessionWindowing JARs in Flink Maven Repository

2024-01-28 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-34052.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

master(1.19) via 324a5e45c80464335a95cad7fbccfd531d0b098b.

> Missing TopSpeedWindowing and SessionWindowing JARs in Flink Maven Repository
> -
>
> Key: FLINK-34052
> URL: https://issues.apache.org/jira/browse/FLINK-34052
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Examples
>Affects Versions: 1.18.0
>Reporter: Junrui Li
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> As a result of the changes implemented in FLINK-32821, the build process no 
> longer produces artifacts with the names 
> flink-examples-streaming-1.x-TopSpeedWindowing.jar and 
> flink-examples-streaming-1.x-SessionWindowing.jar. This has led to the 
> absence of these specific JAR files in the Maven repository 
> (https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.18.0/).
> These artifacts were previously available and may still be expected by users 
> as part of their application dependencies. Their removal could potentially 
> break existing build pipelines and applications that depend on these example 
> JARs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34052][examples] Install the shaded streaming example jars in the maven repo [flink]

2024-01-28 Thread via GitHub


reswqa merged PR #24206:
URL: https://github.com/apache/flink/pull/24206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34258) Incorrect example of accumulator usage within emitUpdateWithRetract for TableAggregateFunction

2024-01-28 Thread Jane Chan (Jira)
Jane Chan created FLINK-34258:
-

 Summary: Incorrect example of accumulator usage within 
emitUpdateWithRetract for TableAggregateFunction
 Key: FLINK-34258
 URL: https://issues.apache.org/jira/browse/FLINK-34258
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Affects Versions: 1.18.1, 1.19.0
Reporter: Jane Chan
Assignee: Jane Chan


The 
[documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#retraction-example]
 provides an example of using `emitUpdateWithRetract`. However, the example is 
misleading as it incorrectly suggests that the accumulator can be updated 
within the `emitUpdateWithRetract method`. In reality, the order of invocation 
is to first call `getAccumulator` and then `emitUpdateWithRetract`, which means 
that updating the accumulator within `emitUpdateWithRetract` will not take 
effect. Please see 
[GroupTableAggFunction#L141|https://github.com/apache/flink/blob/20450485b20cb213b96318b0c3275e42c0300e15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java#L141]
 ~ 
[GroupTableAggFunction#L146|https://github.com/apache/flink/blob/20450485b20cb213b96318b0c3275e42c0300e15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java#L146]
 for more details.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34115][table-planner] Fix TableAggregateITCase unstable test [flink]

2024-01-28 Thread via GitHub


flinkbot commented on PR #24214:
URL: https://github.com/apache/flink/pull/24214#issuecomment-1914082399

   
   ## CI report:
   
   * f62f6452c985a79242c1d9fc3ec57bc1ca13 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34115][table-planner] Fix TableAggregateITCase unstable test [flink]

2024-01-28 Thread via GitHub


LadyForest opened a new pull request, #24214:
URL: https://github.com/apache/flink/pull/24214

   ## What is the purpose of the change
   
   This PR tries to fix the unstable ITCase `TableAggregateITCase`. 
   
   
   ## Brief change log
   
   Replace table source.
   
   
   ## Verifying this change
   - Plan before fix
   ```sql
   Calc(select=[f0 AS top_price, f1 AS rank], changelogMode=[I,UA,D])
   +- GroupTableAggregate(select=[incrementalTop2(price) AS (f0, f1)], 
changelogMode=[I,UA,D])
  +- Exchange(distribution=[single], changelogMode=[I])
 +- Union(all=[true], union=[id, name, price], changelogMode=[I])
:- Calc(select=[CAST(1 AS INTEGER) AS id, 
CAST(_UTF-16LE'Latte':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(6 AS INTEGER) AS 
price], changelogMode=[I])
:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
:- Calc(select=[CAST(2 AS INTEGER) AS id, 
CAST(_UTF-16LE'Milk':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(3 AS INTEGER) AS 
price], changelogMode=[I])
:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
:- Calc(select=[CAST(3 AS INTEGER) AS id, 
CAST(_UTF-16LE'Breve':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(5 AS INTEGER) AS 
price], changelogMode=[I])
:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
:- Calc(select=[CAST(4 AS INTEGER) AS id, 
CAST(_UTF-16LE'Mocha':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(8 AS INTEGER) AS 
price], changelogMode=[I])
:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
+- Calc(select=[CAST(5 AS INTEGER) AS id, 
CAST(_UTF-16LE'Tea':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(4 AS INTEGER) AS 
price], changelogMode=[I])
   +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
   ```
   
   
   - Plan after fix
   ```sql
   Calc(select=[f0 AS top_price, f1 AS rank])
   +- GroupTableAggregate(select=[incrementalTop2(price) AS (f0, f1)])
  +- Exchange(distribution=[single])
 +- TableSourceScan(table=[[default_catalog, default_database, 
myTable]], fields=[id, name, price])
   ```
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34238) In streaming mode, redundant exchange nodes can be optimally deleted in some cases

2024-01-28 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811750#comment-17811750
 ] 

dalongliu commented on FLINK-34238:
---

+1, [~libenchao] Looks forward to your team contribution if possible.

> In streaming mode, redundant exchange nodes can be optimally deleted in some 
> cases
> --
>
> Key: FLINK-34238
> URL: https://issues.apache.org/jira/browse/FLINK-34238
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: xuyang
>Priority: Minor
>
> Take the following plan as an example:
> {code:java}
> Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
> +- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start], 
> win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) 
> AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) 
> AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS 
> window_end])
>+- Exchange(distribution=[hash[a]])
>   +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS 
> $f4, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
>  +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
> min], partition keys=[a])])
> +- Exchange(distribution=[hash[a]])
>+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
> 1000:INTERVAL SECOND)])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) {code}
> If the node `WindowTableFunction`, `Calc` and `WindowAggregate` can be 
> chained finally, the  `Exchange` between `Calc` and `WindowAggregate` can be 
> removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34222) Supports mini-batch for streaming regular join

2024-01-28 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811749#comment-17811749
 ] 

dalongliu commented on FLINK-34222:
---

Merged in master: 20450485b20cb213b96318b0c3275e42c0300e15

> Supports mini-batch for streaming regular join
> --
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34222) Supports mini-batch for streaming regular join

2024-01-28 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu closed FLINK-34222.
-
Resolution: Fixed

> Supports mini-batch for streaming regular join
> --
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34222][table-planner] Supports mini-batch for streaming regular join [flink]

2024-01-28 Thread via GitHub


lsyldliu closed pull request #24161: [FLINK-34222][table-planner] Supports 
mini-batch for streaming regular join
URL: https://github.com/apache/flink/pull/24161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34222) Supports mini-batch for streaming regular join

2024-01-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Summary: Supports mini-batch for streaming regular join  (was: End to end 
implementation of minibatch join)

> Supports mini-batch for streaming regular join
> --
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469124707


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +106,33 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
-internalLeaderElector =
-new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+this.executorService = executorService;
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+cancelCurrentLeaderElectionSession();
+
+currentLeaderElectionSession =
+currentLeaderElectionSession.thenCompose(

Review Comment:
   It seems that `currentLeaderElectionSession.cancel(true)` does not cancel 
the future of `LeaderElector#start()`. Then `KubernetesLeaderElector#stop()` 
will not trigger a revoke leadership.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34257][core] Update Flink YAML Parser to Support YAML 1.2 Specification. [flink]

2024-01-28 Thread via GitHub


flinkbot commented on PR #24213:
URL: https://github.com/apache/flink/pull/24213#issuecomment-1914044726

   
   ## CI report:
   
   * 6862748059d638419ac98ecfbd67643cb540c0ce UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34222) End to end implementation of minibatch join

2024-01-28 Thread Shuai Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-34222:
-
Description: Implement minibatch join in E2E which includes both plan and 
runtime parts.  (was: Get minibatch join operator involved in which includes 
both plan and operator. Implement minibatch join in E2E.)
Summary: End to end implementation of minibatch join  (was: Get 
minibatch join operator involved)

> End to end implementation of minibatch join
> ---
>
> Key: FLINK-34222
> URL: https://issues.apache.org/jira/browse/FLINK-34222
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Implement minibatch join in E2E which includes both plan and runtime parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469121295


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -55,17 +59,32 @@ public class KubernetesLeaderElector {
 
 private final Object lock = new Object();
 
-private final ExecutorService executorService =
-Executors.newFixedThreadPool(
-3, new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
+private final NamespacedKubernetesClient kubernetesClient;
+private final LeaderElectionConfig leaderElectionConfig;
+private final ExecutorService executorService;
 
-private final LeaderElector internalLeaderElector;
+private CompletableFuture currentLeaderElectionSession = 
FutureUtils.completedVoidFuture();
 
 public KubernetesLeaderElector(
 NamespacedKubernetesClient kubernetesClient,
 KubernetesLeaderElectionConfiguration leaderConfig,
 LeaderCallbackHandler leaderCallbackHandler) {
-final LeaderElectionConfig leaderElectionConfig =
+this(
+kubernetesClient,
+leaderConfig,
+leaderCallbackHandler,
+Executors.newFixedThreadPool(
+3, new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")));

Review Comment:
   Single thread pool is enough for now.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +106,33 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
-internalLeaderElector =
-new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+this.executorService = executorService;
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+cancelCurrentLeaderElectionSession();
+
+currentLeaderElectionSession =
+currentLeaderElectionSession.thenCompose(

Review Comment:
   It seems that `currentLeaderElectionSession.cancel(true)` will not cancel 
the future of `LeaderElector#start()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34257) Update Flink YAML Parser to Support YAML 1.2 Specification

2024-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34257:
---
Labels: pull-request-available  (was: )

>  Update Flink YAML Parser to Support YAML 1.2 Specification
> ---
>
> Key: FLINK-34257
> URL: https://issues.apache.org/jira/browse/FLINK-34257
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> FLINK-33364 and FLINK-33577 added snakeyaml and pyyaml dependencies to 
> support a standard YAML parser. However, these parsers support the YAML 1.1 
> specification, not the YAML 1.2 specification. Therefore, we need to update 
> these dependencies that support YAML 1.2.
> The updated dependencies are as follows:
> 1. For Java: change from snakeyaml to snakeyaml-engine
> 2. For Python: change from pyyaml to ruamel.yaml



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34257][core] Update Flink YAML Parser to Support YAML 1.2 Specification. [flink]

2024-01-28 Thread via GitHub


JunRuiLee opened a new pull request, #24213:
URL: https://github.com/apache/flink/pull/24213

   
   
   
   
   ## What is the purpose of the change
   
   Change standard YAML parser to support YAML 1.2.
   
   
   ## Brief change log
   
 - Change snakeyaml to snakeyaml engine 2.6 to support YAML 1.2
 - Change pyyaml to ruamel.yaml 0.18.4 to support YAML 1.2
   
   
   
   ## Verifying this change
   
   
   
   This change added tests and can be verified by 
YamlParserUtilsTest#testYaml12Features
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft][configuration] Change standard YAML parser to support YAML 1.2. [flink]

2024-01-28 Thread via GitHub


JunRuiLee closed pull request #24199: [Draft][configuration] Change standard 
YAML parser to support YAML 1.2.
URL: https://github.com/apache/flink/pull/24199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34257) Update Flink YAML Parser to Support YAML 1.2 Specification

2024-01-28 Thread Junrui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junrui Li updated FLINK-34257:
--
Description: 
FLINK-33364 and FLINK-33577 added snakeyaml and pyyaml dependencies to support 
a standard YAML parser. However, these parsers support the YAML 1.1 
specification, not the YAML 1.2 specification. Therefore, we need to update 
these dependencies that support YAML 1.2.

The updated dependencies are as follows:

1. For Java: change from snakeyaml to snakeyaml-engine
2. For Python: change from pyyaml to ruamel.yaml

  was:
FLINK-33297 and FLINK-33577 added snakeyaml and pyyaml dependencies to support 
a standard YAML parser. However, these parsers support the YAML 1.1 
specification, not the YAML 1.2 specification. Therefore, we need to update 
these dependencies that support YAML 1.2.

The updated dependencies are as follows:

1. For Java: change from snakeyaml to snakeyaml-engine
2. For Python: change from pyyaml to ruamel.yaml


>  Update Flink YAML Parser to Support YAML 1.2 Specification
> ---
>
> Key: FLINK-34257
> URL: https://issues.apache.org/jira/browse/FLINK-34257
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: Junrui Li
>Priority: Major
> Fix For: 1.19.0
>
>
> FLINK-33364 and FLINK-33577 added snakeyaml and pyyaml dependencies to 
> support a standard YAML parser. However, these parsers support the YAML 1.1 
> specification, not the YAML 1.2 specification. Therefore, we need to update 
> these dependencies that support YAML 1.2.
> The updated dependencies are as follows:
> 1. For Java: change from snakeyaml to snakeyaml-engine
> 2. For Python: change from pyyaml to ruamel.yaml



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34257) Update Flink YAML Parser to Support YAML 1.2 Specification

2024-01-28 Thread Junrui Li (Jira)
Junrui Li created FLINK-34257:
-

 Summary:  Update Flink YAML Parser to Support YAML 1.2 
Specification
 Key: FLINK-34257
 URL: https://issues.apache.org/jira/browse/FLINK-34257
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Reporter: Junrui Li
 Fix For: 1.19.0


FLINK-33297 and FLINK-33577 added snakeyaml and pyyaml dependencies to support 
a standard YAML parser. However, these parsers support the YAML 1.1 
specification, not the YAML 1.2 specification. Therefore, we need to update 
these dependencies that support YAML 1.2.

The updated dependencies are as follows:

1. For Java: change from snakeyaml to snakeyaml-engine
2. For Python: change from pyyaml to ruamel.yaml



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34256) Add a documentation section for minibatch join

2024-01-28 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34256:


 Summary: Add a documentation section for minibatch join
 Key: FLINK-34256
 URL: https://issues.apache.org/jira/browse/FLINK-34256
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Shuai Xu


We should add a minibatch join section in Performance Tuning to explain the 
usage and principle of minibatch-join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix] Update copyright year to 2024 [flink-connector-pulsar]

2024-01-28 Thread via GitHub


ruanhang1993 opened a new pull request, #82:
URL: https://github.com/apache/flink-connector-pulsar/pull/82

   
   
   ## Purpose of the change
   
   Update copyright year to 2024.
   
   ## Brief change log
   
   Update copyright year to 2024 in NOTICE.
   
   ## Verifying this change
   
   This change is a doc work without any test coverage.
   
   ## Significant changes
   
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
   - If yes, how is this documented? (not applicable / docs / JavaDocs / 
not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33728] Do not rewatch when KubernetesResourceManagerDriver wat… [flink]

2024-01-28 Thread via GitHub


xintongsong commented on PR #24163:
URL: https://github.com/apache/flink/pull/24163#issuecomment-1914013482

   @zhougit86 ,
   If you need any help in understanding the comments, feel free to reach out 
to my email (tonysong...@gmail.com). We may exchange WeChat ID and have an 
offline phone call if you'd like to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33728] Do not rewatch when KubernetesResourceManagerDriver wat… [flink]

2024-01-28 Thread via GitHub


xintongsong commented on code in PR #24163:
URL: https://github.com/apache/flink/pull/24163#discussion_r1469101401


##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -454,6 +451,22 @@ void testWatchPodsAndDoCallback() throws Exception {
 assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
 }
 
+@Test
+void testWatchPodsAndDoCallbackFail() throws Exception {
+mockPodEventWithLabelsFail(
+NAMESPACE, TASKMANAGER_POD_NAME, 
KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+TestingWatchCallbackHandler watchCallbackHandler =
+TestingWatchCallbackHandler.builder().build();
+Long watchStartTime = System.currentTimeMillis();
+assertThatThrownBy(
+() ->
+this.flinkKubeClient.watchPodsAndDoCallback(
+TESTING_LABELS, watchCallbackHandler));

Review Comment:
   It is unclear to me whether `watchPodsAndDoCallback` suppose to succeed or 
fail? There's no verification against the error being thrown.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -183,6 +183,29 @@ protected void mockPodEventWithLabels(
 .once();
 }
 
+protected void mockPodEventWithLabelsFail(
+String namespace, String podName, String resourceVersion, 
Map labels) {

Review Comment:
   `podName` is never used.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -454,6 +451,22 @@ void testWatchPodsAndDoCallback() throws Exception {
 assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
 }
 
+@Test
+void testWatchPodsAndDoCallbackFail() throws Exception {
+mockPodEventWithLabelsFail(
+NAMESPACE, TASKMANAGER_POD_NAME, 
KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+TestingWatchCallbackHandler watchCallbackHandler =
+TestingWatchCallbackHandler.builder().build();
+Long watchStartTime = System.currentTimeMillis();
+assertThatThrownBy(
+() ->
+this.flinkKubeClient.watchPodsAndDoCallback(
+TESTING_LABELS, watchCallbackHandler));
+Long watchEndTime = System.currentTimeMillis();
+Long watchDuration = watchEndTime - watchStartTime;
+assertThat(watchDuration > 100);

Review Comment:
   Relying on the duration to decide whether the retry has happened can be 
unstable. I'd suggest to mock the server in a way that replies failure messages 
for certain times following by a success message.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -247,11 +259,12 @@ public KubernetesWatch watchPodsAndDoCallback(
 new 
KubernetesPodsWatcher(

 podCallbackHandler))),
 kubeClientExecutorService),
-maxRetryAttempts,
+new ExponentialBackoffRetryStrategy(
+maxRetryAttempts, initialRetryInterval, 
maxRetryInterval),
 t ->
 ExceptionUtils.findThrowable(t, 
KubernetesClientException.class)
 .isPresent(),
-kubeClientExecutorService)
+new 
ScheduledExecutorServiceAdapter(kubeClientExecutorService))
 .get();

Review Comment:
   This means `watchPodsAndDoCallback` is blocking and can wait for minutes to 
return. It should not longer be called on the RPC main thread.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -454,6 +451,22 @@ void testWatchPodsAndDoCallback() throws Exception {
 assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
 }
 
+@Test
+void testWatchPodsAndDoCallbackFail() throws Exception {

Review Comment:
   `Exception` is never thrown.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -183,6 +183,29 @@ protected void mockPodEventWithLabels(
 .once();
 }
 
+protected void mockPodEventWithLabelsFail(
+String namespace, String podName, String resourceVersion, 
Map labels) {
+final Pod pod =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace(namespace)
+.withName(podName)
+.withLabels(labels)
+

[jira] [Created] (FLINK-34255) FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2024-01-28 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34255:
---

 Summary: FLIP-406: Reorganize State & Checkpointing & Recovery 
Configuration
 Key: FLINK-34255
 URL: https://issues.apache.org/jira/browse/FLINK-34255
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 2.0.0


The FLIP: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789560

 

Currently, the configuration options pertaining to checkpointing, recovery, and 
state management are primarily grouped under the following prefixes:
 * *state.backend.** : configurations related to state accessing and 
checkpointing, as well as specific options for individual state backends
 * *execution.checkpointing.** : configurations associated with checkpoint 
execution and recovery
 * {*}execution.savepoint.*{*}: configurations for recovery from savepoint

In addition, there are several individual options such as 
_{{state.checkpoint-storage}}_ and _{{state.checkpoints.dir}}_ that fall 
outside of these prefixes. The current arrangement of these options, which span 
multiple modules, is somewhat haphazard and lacks a systematic structure. For 
example, the options under the {{_CheckpointingOptions_ }}and 
{{_ExecutionCheckpointingOptions_ }}are related and have no clear boundaries 
from the user's perspective, but there is no unified prefix for them. With the 
upcoming release of Flink 2.0, we have an excellent opportunity to overhaul and 
restructure the configurations related to checkpointing, recovery, and state 
management. This FLIP proposes to reorganize these settings, making it more 
coherent by module, which would significantly lower the barriers for 
understanding and reduce the development costs moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34219) Introduce a new join operator to support minibatch

2024-01-28 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811741#comment-17811741
 ] 

lincoln lee commented on FLINK-34219:
-

[~xu_shuai_] For this new optimization, can we add some user documentation, for 
example, a new section on this page: 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation

> Introduce a new join operator to support minibatch
> --
>
> Key: FLINK-34219
> URL: https://issues.apache.org/jira/browse/FLINK-34219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Shuai Xu
>Assignee: Shuai Xu
>Priority: Major
> Fix For: 1.19.0
>
>
> This is the parent task of FLIP-415.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34109) FileSystem sink connector restore job from historical checkpoint failure

2024-01-28 Thread Sergey Paryshev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811732#comment-17811732
 ] 

Sergey Paryshev commented on FLINK-34109:
-

I guess this is same: 
https://lists.apache.org/thread/2c17wy7nz3jj10mh1jzgwxjhodzv4bwg

> FileSystem sink connector restore job from historical checkpoint failure
> 
>
> Key: FLINK-34109
> URL: https://issues.apache.org/jira/browse/FLINK-34109
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.18.0, 1.16.3, 1.17.2
>Reporter: Sergey Paryshev
>Priority: Minor
>  Labels: pull-request-available
>
> FileSystem connector sink with compaction setting is enabled can't restore 
> job from historical checkpoint (when MAX_RETAINED_CHECKPOINTS > 1 and 
> restroing checkpoint is not last)
> {code:java}
> java.io.UncheckedIOException: java.io.FileNotFoundException: File 
> file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1
>  does not exist or the user running Flink ('user') has insufficient 
> permissions to access it.
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:165)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.BinPacking.pack(BinPacking.java:40) 
> ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:175)
>  ~[classes/:?]
>     at java.util.HashMap.forEach(HashMap.java:1290) ~[?:1.8.0_312]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:171)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:153)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:143)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:262)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:155)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:554)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:245)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:848)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:797)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933) 
> ~[classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:747) 
> ~[classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[classes/:?]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
> Caused by: java.io.FileNotFoundException: File 
> file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1
>  does not exist or the user running Flink ('user') has insufficient 
> permissions to access it.
>     at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:113)
>  ~[classes/:?]
>     at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
>  ~[classes/:?]
>     ... 19 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34115) TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate fails

2024-01-28 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811725#comment-17811725
 ] 

Jane Chan edited comment on FLINK-34115 at 1/29/24 4:02 AM:


The root cause of the unstable case is the source used. `tableEnv.fromValues` 
will interpret multiple records by Values->Calc and union them all, which uses 
a global ship strategy, and the input order is not guaranteed.
{code:sql}
== Optimized Physical Plan ==
Calc(select=[f0 AS top_price, f1 AS rank], changelogMode=[I,UA,D])
+- GroupTableAggregate(select=[incrementalTop2(price) AS (f0, f1)], 
changelogMode=[I,UA,D])
   +- Exchange(distribution=[single], changelogMode=[I])
      +- Union(all=[true], union=[id, name, price], changelogMode=[I])
         :- Calc(select=[CAST(1 AS INTEGER) AS id, 
CAST(_UTF-16LE'Latte':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(6 AS INTEGER) AS 
price], changelogMode=[I])
         :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
         :- Calc(select=[CAST(2 AS INTEGER) AS id, 
CAST(_UTF-16LE'Milk':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(3 AS INTEGER) AS 
price], changelogMode=[I])
         :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
         :- Calc(select=[CAST(3 AS INTEGER) AS id, 
CAST(_UTF-16LE'Breve':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(5 AS INTEGER) AS 
price], changelogMode=[I])
         :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
         :- Calc(select=[CAST(4 AS INTEGER) AS id, 
CAST(_UTF-16LE'Mocha':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(8 AS INTEGER) AS 
price], changelogMode=[I])
         :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I])
         +- Calc(select=[CAST(5 AS INTEGER) AS id, 
CAST(_UTF-16LE'Tea':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, CAST(4 AS INTEGER) AS 
price], changelogMode=[I])
            +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], 
changelogMode=[I]) {code}


was (Author: qingyue):
The root cause of the unstable case is the source used. `tableEnv.fromValues` 
will interpret multiple records by Values->Calc and union them all, which uses 
a global ship strategy, and the input order is not guaranteed.

> TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate fails
> --
>
> Key: FLINK-34115
> URL: https://issues.apache.org/jira/browse/FLINK-34115
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Matthias Pohl
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0, 1.18.2
>
>
> It failed twice in the same pipeline run:
>  * 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56348=logs=de826397-1924-5900-0034-51895f69d4b7=f311e913-93a2-5a37-acab-4a63e1328f94=11613]
>  * 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56348=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=11963]
> {code:java}
>  Jan 14 01:20:01 01:20:01.949 [ERROR] Tests run: 18, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 29.07 s <<< FAILURE! -- in 
> org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase
> Jan 14 01:20:01 01:20:01.949 [ERROR] 
> org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate
>  -- Time elapsed: 0.518 s <<< FAILURE!
> Jan 14 01:20:01 org.opentest4j.AssertionFailedError: 
> Jan 14 01:20:01 
> Jan 14 01:20:01 expected: List((true,6,1), (false,6,1), (true,6,1), 
> (true,3,2), (false,6,1), (false,3,2), (true,6,1), (true,5,2), (false,6,1), 
> (false,5,2), (true,8,1), (true,6,2), (false,8,1), (false,6,2), (true,8,1), 
> (true,6,2))
> Jan 14 01:20:01  but was: List((true,3,1), (false,3,1), (true,5,1), 
> (true,3,2), (false,5,1), (false,3,2), (true,8,1), (true,5,2), (false,8,1), 
> (false,5,2), (true,8,1), (true,5,2), (false,8,1), (false,5,2), (true,8,1), 
> (true,6,2))
> Jan 14 01:20:01   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 14 01:20:01   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 14 01:20:01   at 
> 

[jira] [Commented] (FLINK-34115) TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate fails

2024-01-28 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811725#comment-17811725
 ] 

Jane Chan commented on FLINK-34115:
---

The root cause of the unstable case is the source used. `tableEnv.fromValues` 
will interpret multiple records by Values->Calc and union them all, which uses 
a global ship strategy, and the input order is not guaranteed.

> TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate fails
> --
>
> Key: FLINK-34115
> URL: https://issues.apache.org/jira/browse/FLINK-34115
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Matthias Pohl
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0, 1.18.2
>
>
> It failed twice in the same pipeline run:
>  * 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56348=logs=de826397-1924-5900-0034-51895f69d4b7=f311e913-93a2-5a37-acab-4a63e1328f94=11613]
>  * 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56348=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=11963]
> {code:java}
>  Jan 14 01:20:01 01:20:01.949 [ERROR] Tests run: 18, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 29.07 s <<< FAILURE! -- in 
> org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase
> Jan 14 01:20:01 01:20:01.949 [ERROR] 
> org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate
>  -- Time elapsed: 0.518 s <<< FAILURE!
> Jan 14 01:20:01 org.opentest4j.AssertionFailedError: 
> Jan 14 01:20:01 
> Jan 14 01:20:01 expected: List((true,6,1), (false,6,1), (true,6,1), 
> (true,3,2), (false,6,1), (false,3,2), (true,6,1), (true,5,2), (false,6,1), 
> (false,5,2), (true,8,1), (true,6,2), (false,8,1), (false,6,2), (true,8,1), 
> (true,6,2))
> Jan 14 01:20:01  but was: List((true,3,1), (false,3,1), (true,5,1), 
> (true,3,2), (false,5,1), (false,3,2), (true,8,1), (true,5,2), (false,8,1), 
> (false,5,2), (true,8,1), (true,5,2), (false,8,1), (false,5,2), (true,8,1), 
> (true,6,2))
> Jan 14 01:20:01   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 14 01:20:01   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 14 01:20:01   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 14 01:20:01   at 
> org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.checkRank$1(TableAggregateITCase.scala:122)
> Jan 14 01:20:01   at 
> org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate(TableAggregateITCase.scala:69)
> Jan 14 01:20:01   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 14 01:20:01   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Jan 14 01:20:01   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Jan 14 01:20:01   at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
> Jan 14 01:20:01   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Jan 14 01:20:01   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Jan 14 01:20:01   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Jan 14 01:20:01   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Jan 14 01:20:01   at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> Jan 14 01:20:01   at 
> scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:26)
> Jan 14 01:20:01   at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> Jan 14 01:20:01   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Jan 14 01:20:01   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> Jan 14 01:20:01   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> Jan 14 01:20:01   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> Jan 14 01:20:01   at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> Jan 14 01:20:01   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> Jan 14 01:20:01   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> Jan 14 01:20:01   at 
> 

Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469047743


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   If we enable the `isReleaseOnCancel` and do not upgrade the fabricio k8s 
client from v6.6.2 to v6.9.0, which include the fix 
https://github.com/fabric8io/kubernetes-client/issues/5463, then we have the 
risk that the `onStopLeading()` is never executed though the leadership lost. 
From the Flink's perspective, the `revokeLeadership` is never called while a 
new `grantLeadership` happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469043488


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   I agree with you that we do not need to add 
`leadershipCallbackHandler.waitForRevokeLeader();` to the ITCase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions

2024-01-28 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34254:
---
Description: 
Add the test following to CalcITCase to re-produce this bug.
{code:java}
@Test
  def test(): Unit = {
tEnv.executeSql(s"""
   |create catalog `c_new` with (
   |  'type' = 'generic_in_memory',
   |  'default-database' = 'my_d'
   |)
   |""".stripMargin)

tEnv
  .executeSql(s"""
 |show catalogs
 |""".stripMargin)
  .print

tEnv
  .executeSql(s"""
 | describe catalog default_catalog
 |""".stripMargin)
  .print

  } {code}
Result:
{code:java}
+-+
|    catalog name |
+-+
|           c_new |
| default_catalog |
+-+
2 rows in set
 org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in 
any table
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
 at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) 
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
   at 
org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453)
  at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)  at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)  at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)  at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused
 by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 19 
to line 2, column 33: Column 'default_catalog' not found in any table  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276)
  at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150)
  at 
org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304)at 
org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474)at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005)
at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
... 11 moreCaused by: 
org.apache.calcite.sql.validate.SqlValidatorException: Column 'default_catalog' 
not found in any tableat 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  at 
org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599)   ... 23 more
{code}
The syntax `DESCRIBE DATABASE xxx` has the same bug.

 

It seems that this flip is not finished yet. 
https://issues.apache.org/jira/browse/FLINK-14686

  was:
Add the test following to CalcITCase to re-produce this bug.

 
{code:java}
@Test
  def test(): Unit = {
tEnv.executeSql(s"""

[jira] [Updated] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions

2024-01-28 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34254:
---
Description: 
Add the test following to CalcITCase to re-produce this bug.

 
{code:java}
@Test
  def test(): Unit = {
tEnv.executeSql(s"""
   |create catalog `c_new` with (
   |  'type' = 'generic_in_memory',
   |  'default-database' = 'my_d'
   |)
   |""".stripMargin)

tEnv
  .executeSql(s"""
 |show catalogs
 |""".stripMargin)
  .print

tEnv
  .executeSql(s"""
 | describe catalog default_catalog
 |""".stripMargin)
  .print

  } {code}
Result:

 

 
{code:java}
+-+
|    catalog name |
+-+
|           c_new |
| default_catalog |
+-+
2 rows in set
 org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in 
any table
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
 at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) 
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
   at 
org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453)
  at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)  at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)  at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)  at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused
 by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 19 
to line 2, column 33: Column 'default_catalog' not found in any table  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276)
  at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150)
  at 
org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304)at 
org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474)at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005)
at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
... 11 moreCaused by: 
org.apache.calcite.sql.validate.SqlValidatorException: Column 'default_catalog' 
not found in any tableat 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  at 
org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599)   ... 23 more
{code}
The syntax `DESCRIBE DATABASE xxx` has the same bug.

  was:
Add the test following to CalcITCase to re-produce this bug.

 
{code:java}
@Test
  def test(): Unit = {
tEnv.executeSql(s"""
   |create catalog `c_new` with (
   |  'type' = 

[jira] [Updated] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions

2024-01-28 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34254:
---
Description: 
Add the test following to CalcITCase to re-produce this bug.

 
{code:java}
@Test
  def test(): Unit = {
tEnv.executeSql(s"""
   |create catalog `c_new` with (
   |  'type' = 'generic_in_memory',
   |  'default-database' = 'my_d'
   |)
   |""".stripMargin)

tEnv
  .executeSql(s"""
 |show catalogs
 |""".stripMargin)
  .print

tEnv
  .executeSql(s"""
 | describe catalog default_catalog
 |""".stripMargin)
  .print

  } {code}
Result:

 

 
{code:java}
+-+
|    catalog name |
+-+
|           c_new |
| default_catalog |
+-+
2 rows in set
 org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in 
any table
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
 at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) 
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
   at 
org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453)
  at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)  at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)  at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)  at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused
 by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 19 
to line 2, column 33: Column 'default_catalog' not found in any table  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276)
  at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273)
   at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150)
  at 
org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304)at 
org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474)at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005)
at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
... 11 moreCaused by: 
org.apache.calcite.sql.validate.SqlValidatorException: Column 'default_catalog' 
not found in any tableat 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  at 
org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599)   ... 23 more
{code}
 

 

> `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions
> -
>
> Key: FLINK-34254
> URL: https://issues.apache.org/jira/browse/FLINK-34254
> Project: Flink

[jira] [Created] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions

2024-01-28 Thread xuyang (Jira)
xuyang created FLINK-34254:
--

 Summary: `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws 
strange exceptions
 Key: FLINK-34254
 URL: https://issues.apache.org/jira/browse/FLINK-34254
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Update copyright year to 2024 [flink-connector-opensearch]

2024-01-28 Thread via GitHub


boring-cyborg[bot] commented on PR #40:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/40#issuecomment-1913891222

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-01-28 Thread via GitHub


flinkbot commented on PR #24212:
URL: https://github.com/apache/flink/pull/24212#issuecomment-1913877869

   
   ## CI report:
   
   * 3d66ebaffc112e0b3777fc3e3841b64f045357bb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34253) Offsets out of range with no configured reset policy for partitions

2024-01-28 Thread Jepson (Jira)
Jepson created FLINK-34253:
--

 Summary: Offsets out of range with no configured reset policy for 
partitions
 Key: FLINK-34253
 URL: https://issues.apache.org/jira/browse/FLINK-34253
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.4
 Environment: flink 1.14.4

 
Reporter: Jepson


java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-table_2.12-1.14.4.jar:1.14.4] at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 [flink-table_2.12-1.14.4.jar:1.14.4] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_241] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_241] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_241] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_241] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241] Caused by: 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: {dp-oracle-sllv-0=12734616



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33981:
---
Labels: pull-request-available  (was: )

> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> ---
>
> Key: FLINK-33981
> URL: https://issues.apache.org/jira/browse/FLINK-33981
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
>
> When using MiniCluster mode, file descriptors like 
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are 
> not released after a Job completes. Executing multiple Jobs in the same JVM 
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java18162 sa_cluster   30r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   31r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   32r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   33r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   34r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   35r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   36r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   37r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   38r   DIR  253,1 01311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
> for (int i = 0; i < 10; ++i) {
>   int round = i;
>   Thread thread = new Thread(() -> {
> try {
>   Configuration configuration = new Configuration();
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>   env.setParallelism(1);
>   DataStreamSource longDataStreamSource = env.fromSequence(1, 
> 10);
>   longDataStreamSource.addSink(new DiscardingSink<>());
>   StreamGraph streamGraph = env.getStreamGraph();
>   streamGraph.setJobName("test-" + System.nanoTime());
>   JobClient jobClient = env.executeAsync(streamGraph);
>   CompletableFuture 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>   JobExecutionResult jobExecutionResult = null;
>   while (jobExecutionResult == null) {
> try {
>   jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
> } catch (TimeoutException timeoutException) {
>   // ignore
> }
>   }
>   System.out.println("finished round: " + round);
>   env.close();
> } catch (Exception e) {
>   throw new RuntimeException(e);
> }
>   });
>   thread.setDaemon(true);
>   thread.start();
>   thread.join();
>   System.out.println("done ... " + i);
> }
> 
> // === lsof -p 18162
> 

[PR] [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files [flink]

2024-01-28 Thread via GitHub


fengjiajie opened a new pull request, #24212:
URL: https://github.com/apache/flink/pull/24212

   
   
   ## What is the purpose of the change
   
   When using MiniCluster mode, file descriptors like 
/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.
   
   After executing the reproducing code provided below (after entering the 
sleep), running lsof -p 18162 reveals:
   
   ```
   ...
   java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
   java18162 sa_cluster   31r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
   java18162 sa_cluster   32r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
   java18162 sa_cluster   33r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
   java18162 sa_cluster   34r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
   java18162 sa_cluster   35r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
   java18162 sa_cluster   36r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
   java18162 sa_cluster   37r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
   java18162 sa_cluster   38r   DIR  253,1 01311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
   ...
   ```
   
   The code used for reproduction is as follows:
   
   ```
   import org.apache.flink.api.common.JobExecutionResult;
   import org.apache.flink.configuration.Configuration;
   import org.apache.flink.core.execution.JobClient;
   import org.apache.flink.streaming.api.datastream.DataStreamSource;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
   import org.apache.flink.streaming.api.graph.StreamGraph;
   
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.TimeoutException;
   
   /**
* javac -cp 'lib/*' TestReleaseFd.java
* java -Xmx600m -cp '.:lib/*' TestReleaseFd
*/
   public class TestReleaseFd {
   
 public static void main(String[] args) throws Exception {
   for (int i = 0; i < 10; ++i) {
 int round = i;
 Thread thread = new Thread(() -> {
   try {
 Configuration configuration = new Configuration();
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 env.setParallelism(1);
   
 DataStreamSource longDataStreamSource = env.fromSequence(1, 
10);
 longDataStreamSource.addSink(new DiscardingSink<>());
   
 StreamGraph streamGraph = env.getStreamGraph();
 streamGraph.setJobName("test-" + System.nanoTime());
 JobClient jobClient = env.executeAsync(streamGraph);
   
 CompletableFuture 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
 JobExecutionResult jobExecutionResult = null;
 while (jobExecutionResult == null) {
   try {
 jobExecutionResult = 
jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
   } catch (TimeoutException timeoutException) {
 // ignore
   }
 }
 System.out.println("finished round: " + round);
 env.close();
   } catch (Exception e) {
 throw new RuntimeException(e);
   }
 });
   
 thread.setDaemon(true);
 thread.start();
 thread.join();
   
 System.out.println("done ... " + i);
   }
   
   // === lsof -p 18162
   Thread.sleep(500_000_000);
 }
   }
   ```
   
   ## Brief change log
   
   Close the DirectoryStream after using.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths 

[jira] [Resolved] (FLINK-34249) Remove DefaultSlotTracker related logic.

2024-01-28 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-34249.
-
Fix Version/s: 1.19.0
   Resolution: Fixed

> Remove DefaultSlotTracker related logic.
> 
>
> Key: FLINK-34249
> URL: https://issues.apache.org/jira/browse/FLINK-34249
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Task
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Update copyright year to 2024 [flink-connector-mongodb]

2024-01-28 Thread via GitHub


Jiabao-Sun commented on PR #24:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/24#issuecomment-1913860680

   Hi @leonardBang, please help take a look when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34249) Remove DefaultSlotTracker related logic.

2024-01-28 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811711#comment-17811711
 ] 

Rui Fan commented on FLINK-34249:
-

Merged to master(1.19.0) via : a24f7717847ce4e4c511070257e99d7c3f948d2a

> Remove DefaultSlotTracker related logic.
> 
>
> Key: FLINK-34249
> URL: https://issues.apache.org/jira/browse/FLINK-34249
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Task
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34249) Remove DefaultSlotTracker related logic.

2024-01-28 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-34249:
---

Assignee: RocMarshal

> Remove DefaultSlotTracker related logic.
> 
>
> Key: FLINK-34249
> URL: https://issues.apache.org/jira/browse/FLINK-34249
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Task
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-24223) Client should throw exception to warn users when the configurations set by program options conflict with those set by -D

2024-01-28 Thread Zhanghao Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen closed FLINK-24223.
-
Resolution: Later

> Client should throw exception to warn users when the configurations set by 
> program options conflict with those set by -D
> 
>
> Key: FLINK-24223
> URL: https://issues.apache.org/jira/browse/FLINK-24223
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.11.0, 1.12.0, 1.13.0
>Reporter: Zhanghao Chen
>Priority: Minor
>
> h2. Problem
> Currently, program options (e.g. -d, -p) has high precedence over 
> configuration options set by -D or -yD at client side. This may cause 
> confusion for users, especially for those program options without args. For 
> example, if a user sets -Dexecution.attached=false without setting -d (they 
> may not be aware of the existence of this option), they will find that the 
> configuration value does not take effect.
> h2. Proposal
> Client should throw exception to warn users when the configurations set by 
> program options conflict with those set by -D.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24223) Client should throw exception to warn users when the configurations set by program options conflict with those set by -D

2024-01-28 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811710#comment-17811710
 ] 

Zhanghao Chen commented on FLINK-24223:
---

Closing it as Flink 2.0 has a plan to refactor the CLI command design

> Client should throw exception to warn users when the configurations set by 
> program options conflict with those set by -D
> 
>
> Key: FLINK-24223
> URL: https://issues.apache.org/jira/browse/FLINK-24223
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.11.0, 1.12.0, 1.13.0
>Reporter: Zhanghao Chen
>Priority: Minor
>
> h2. Problem
> Currently, program options (e.g. -d, -p) has high precedence over 
> configuration options set by -D or -yD at client side. This may cause 
> confusion for users, especially for those program options without args. For 
> example, if a user sets -Dexecution.attached=false without setting -d (they 
> may not be aware of the existence of this option), they will find that the 
> configuration value does not take effect.
> h2. Proposal
> Client should throw exception to warn users when the configurations set by 
> program options conflict with those set by -D.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34249][runtime] Remove DefaultSlotTracker related logic. [flink]

2024-01-28 Thread via GitHub


1996fanrui merged PR #24204:
URL: https://github.com/apache/flink/pull/24204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34252][table] Fix lastRecordTime tracking in WatermarkAssignerOperator [flink]

2024-01-28 Thread via GitHub


flinkbot commented on PR #24211:
URL: https://github.com/apache/flink/pull/24211#issuecomment-1913851473

   
   ## CI report:
   
   * 3b898a0c74a5dcb957ab7ab30b79dd82d01e8c0e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34252) WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under continuous data flow

2024-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34252:
---
Labels: pull-request-available  (was: )

> WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under 
> continuous data flow
> -
>
> Key: FLINK-34252
> URL: https://issues.apache.org/jira/browse/FLINK-34252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.3, 1.17.2, 1.18.1
>Reporter: David Christle
>Priority: Major
>  Labels: pull-request-available
>
> The WatermarkAssignerOperator in the table runtime incorrectly transitions to 
> an IDLE state even when data is continuously flowing. This behavior, observed 
> under normal operating conditions where the interval between data elements is 
> shorter than the configured idleTimeout, leads to regular transitions between 
> ACTIVE and IDLE states, which are unnecessary.
> _Detail:_
> In the current implementation, the lastRecordTime variable, which tracks the 
> time of the last received data element, is updated only when the 
> WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated 
> when WatermarkStatus is ACTIVE, which means even under continuous data flow, 
> the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually 
> always become true, and the WatermarkStatus will erroneously be marked IDLE. 
> It is unclear to me if this bug produces any incorrectness downstream, since 
> when the WatermarkStatus is in in the IDLE state, the next processElement 
> will cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should 
> eliminate this flip-flop behavior between states.
> The test I wrote fails without the fix and illustrates the flip-flops:
> {noformat}
> [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 
> s <<< FAILURE! -- in 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
> [ERROR] 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
>  -- Time elapsed: 0.013 s <<< FAILURE!
> java.lang.AssertionError:
> Expecting
>   [WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE),
> WatermarkStatus(ACTIVE),
> WatermarkStatus(IDLE)]
> not to contain
>   [WatermarkStatus(IDLE)]
> but found
>   [WatermarkStatus(IDLE)]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34252] [table] Fix lastRecordTime tracking in WatermarkAssignerOperator [flink]

2024-01-28 Thread via GitHub


dchristle opened a new pull request, #24211:
URL: https://github.com/apache/flink/pull/24211

   
   
   ## What is the purpose of the change
   
   In the current implementation, the lastRecordTime variable, which tracks the 
time of the last received data element, is updated only when the 
WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated 
when WatermarkStatus is ACTIVE, which means even under continuous data flow, 
the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually 
always become true, and the WatermarkStatus will erroneously be marked IDLE.
   
   It is unclear to me if this bug produces any incorrectness downstream, since 
when the WatermarkStatus is in in the IDLE state, the next processElement will 
cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should eliminate 
this flip-flop behavior between states.
   
   ## Brief change log
   
- Update lastRecordTime in table WatermarkAssignerOperator on each record 
to prevent idle WatermarkStatus
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added test that validates the WatermarkStatus is not set to idle when 
records are sent more frequently than the idleTimeout
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


fsk119 commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1469008084


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java:
##
@@ -44,19 +45,24 @@ final class FunctionSignatureTemplate {
 
 final @Nullable String[] argumentNames;
 
+final @Nullable Boolean[] argumentOptionals;

Review Comment:
   I think it's not related to InputStrategy here. `FunctionSignatureTemplate` 
just a POJO object that describes the method. The current implementation is 
counterintuitive and contradicts the way users utilize it.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


fsk119 commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1469006005


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##
@@ -323,6 +327,37 @@ private void verifyMappingForMethod(
 verification.verify(method, signature.toClass(), 
result.toClass()));
 }
 
+private void verifyOptionalOnPrimitiveParameter(
+Method method,
+Map 
collectedMappingsPerMethod) {
+
+collectedMappingsPerMethod.forEach(
+(signature, result) -> {
+Boolean[] argumentOptional = signature.argumentOptionals;
+if (argumentOptional != null
+&& 
Arrays.stream(argumentOptional).anyMatch(Boolean::booleanValue)) {
+// do something check
+FunctionSignatureTemplate functionResultTemplate =

Review Comment:
   `FunctionSignatureTemplate#of` also contains argument type. I write a typo 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33899][connectors/mongodb] Java 17 and 21 support for mongodb connector [flink-connector-mongodb]

2024-01-28 Thread via GitHub


Jiabao-Sun commented on PR #21:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/21#issuecomment-1913831390

   Thanks @leonardBang for the review.
   Rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33899][connectors/mongodb] Java 17 and 21 support for mongodb connector [flink-connector-mongodb]

2024-01-28 Thread via GitHub


leonardBang commented on PR #21:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/21#issuecomment-1913828923

   @Jiabao-Sun could you help rebase this PR to latest master?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34237][connectors/mongodb] Update MongoWriterITCase to be compatible with updated SinkV2 interfaces [flink-connector-mongodb]

2024-01-28 Thread via GitHub


leonardBang merged PR #22:
URL: https://github.com/apache/flink-connector-mongodb/pull/22


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34222][table] End-to-end implementation of minibatch join [flink]

2024-01-28 Thread via GitHub


xishuaidelin commented on PR #24161:
URL: https://github.com/apache/flink/pull/24161#issuecomment-1913822230

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


hackergin commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468995720


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##
@@ -323,6 +327,37 @@ private void verifyMappingForMethod(
 verification.verify(method, signature.toClass(), 
result.toClass()));
 }
 
+private void verifyOptionalOnPrimitiveParameter(
+Method method,
+Map 
collectedMappingsPerMethod) {
+
+collectedMappingsPerMethod.forEach(
+(signature, result) -> {
+Boolean[] argumentOptional = signature.argumentOptionals;
+if (argumentOptional != null
+&& 
Arrays.stream(argumentOptional).anyMatch(Boolean::booleanValue)) {
+// do something check
+FunctionSignatureTemplate functionResultTemplate =

Review Comment:
   The main purpose here is to determine if there is an "optional" in 
FunctionHint while the actual method declaration is of primitive type, so it 
must be compared with the FunctionSignature generated by the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


hackergin commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468994767


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java:
##
@@ -44,19 +45,24 @@ final class FunctionSignatureTemplate {
 
 final @Nullable String[] argumentNames;
 
+final @Nullable Boolean[] argumentOptionals;
+
 private FunctionSignatureTemplate(
 List argumentTemplates,
 boolean isVarArgs,
-@Nullable String[] argumentNames) {
+@Nullable String[] argumentNames,
+@Nullable Boolean[] argumentOptionals) {

Review Comment:
   Removed the  `nullable` and  generating default optional for each parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


hackergin commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468994153


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##
@@ -143,6 +144,9 @@ protected Map extractResultMa
 // check if the method can be called
 verifyMappingForMethod(correctMethod, 
collectedMappingsPerMethod, verification);
 
+// check if we declare optional on a primitive type parameter
+verifyOptionalOnPrimitiveParameter(correctMethod, 
collectedMappingsPerMethod);
+
 // check if method strategies conflict with function strategies
 collectedMappingsPerMethod.forEach(
 (signature, result) -> putMapping(collectedMappings, 
signature, result));

Review Comment:
   I think we can skip this check here if the user is not using namedArgument, 
so I think we don't need to check it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


hackergin commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468993636


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java:
##
@@ -44,19 +45,24 @@ final class FunctionSignatureTemplate {
 
 final @Nullable String[] argumentNames;
 
+final @Nullable Boolean[] argumentOptionals;

Review Comment:
   Currently, since optional is not included as part of inputStrategy, 
argumentName and argumentOptional are not involved in the calculation of equal 
and hashcode. Therefore, if optional participates in the calculation of 
hashCode and equals here, it may result in duplicate inputStrategies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


hackergin commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468992167


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala:
##
@@ -237,6 +238,9 @@ object StringCallGen {
 val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
 generateNonNullField(returnType, currentDatabase)
 
+  case DEFAULT =>

Review Comment:
   The default function is not registered, it is only used for codegen judgment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34034][docs] Update the query hint docs to clarify the resolution of conflicts in kv hint and list hint [flink]

2024-01-28 Thread via GitHub


swuferhong commented on PR #24077:
URL: https://github.com/apache/flink/pull/24077#issuecomment-1913790623

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34252) WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under continuous data flow

2024-01-28 Thread David Christle (Jira)
David Christle created FLINK-34252:
--

 Summary: WatermarkAssignerOperator should not emit 
WatermarkStatus.IDLE under continuous data flow
 Key: FLINK-34252
 URL: https://issues.apache.org/jira/browse/FLINK-34252
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.18.1, 1.17.2, 1.16.3
Reporter: David Christle


The WatermarkAssignerOperator in the table runtime incorrectly transitions to 
an IDLE state even when data is continuously flowing. This behavior, observed 
under normal operating conditions where the interval between data elements is 
shorter than the configured idleTimeout, leads to regular transitions between 
ACTIVE and IDLE states, which are unnecessary.

_Detail:_
In the current implementation, the lastRecordTime variable, which tracks the 
time of the last received data element, is updated only when the 
WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated 
when WatermarkStatus is ACTIVE, which means even under continuous data flow, 
the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually 
always become true, and the WatermarkStatus will erroneously be marked IDLE. 

It is unclear to me if this bug produces any incorrectness downstream, since 
when the WatermarkStatus is in in the IDLE state, the next processElement will 
cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should eliminate 
this flip-flop behavior between states.

The test I wrote fails without the fix and illustrates the flip-flops:

{noformat}
[ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 s 
<<< FAILURE! -- in 
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
[ERROR] 
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
 -- Time elapsed: 0.013 s <<< FAILURE!
java.lang.AssertionError:

Expecting
  [WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE),
WatermarkStatus(ACTIVE),
WatermarkStatus(IDLE)]
not to contain
  [WatermarkStatus(IDLE)]
but found
  [WatermarkStatus(IDLE)]
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34222][table] End-to-end implementation of minibatch join [flink]

2024-01-28 Thread via GitHub


xishuaidelin commented on PR #24161:
URL: https://github.com/apache/flink/pull/24161#issuecomment-1913631542

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-28 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811659#comment-17811659
 ] 

Feng Jiajie commented on FLINK-33981:
-

I found the issue and will submit a fix later. Please assign the Jira issue to 
me, thanks.

> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> ---
>
> Key: FLINK-33981
> URL: https://issues.apache.org/jira/browse/FLINK-33981
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Feng Jiajie
>Priority: Major
>
> When using MiniCluster mode, file descriptors like 
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are 
> not released after a Job completes. Executing multiple Jobs in the same JVM 
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java18162 sa_cluster   30r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   31r   DIR  253,1 01311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java18162 sa_cluster   32r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   33r   DIR  253,1 01310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java18162 sa_cluster   34r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   35r   DIR  253,1 01311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java18162 sa_cluster   36r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   37r   DIR  253,1 01311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java18162 sa_cluster   38r   DIR  253,1 01311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
> for (int i = 0; i < 10; ++i) {
>   int round = i;
>   Thread thread = new Thread(() -> {
> try {
>   Configuration configuration = new Configuration();
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>   env.setParallelism(1);
>   DataStreamSource longDataStreamSource = env.fromSequence(1, 
> 10);
>   longDataStreamSource.addSink(new DiscardingSink<>());
>   StreamGraph streamGraph = env.getStreamGraph();
>   streamGraph.setJobName("test-" + System.nanoTime());
>   JobClient jobClient = env.executeAsync(streamGraph);
>   CompletableFuture 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>   JobExecutionResult jobExecutionResult = null;
>   while (jobExecutionResult == null) {
> try {
>   jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
> } catch (TimeoutException timeoutException) {
>   // ignore
> }
>   }
>   System.out.println("finished round: " + round);
>   env.close();
> } catch (Exception e) {
>   throw new RuntimeException(e);
> }
>   });
>   thread.setDaemon(true);
>   thread.start();
>   thread.join();
>   System.out.println("done ... " + i);
> }
> 
> // === 

Re: [PR] [FLINK-33728] Do not rewatch when KubernetesResourceManagerDriver wat… [flink]

2024-01-28 Thread via GitHub


zhougit86 commented on PR #24163:
URL: https://github.com/apache/flink/pull/24163#issuecomment-1913601244

   @xintongsong Hi , I got a chance to modify my commit, would you please 
bother to review for another time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34158][table] Migrate WindowAggregateReduceFunctionsRule to java [flink]

2024-01-28 Thread via GitHub


JingGe commented on code in PR #24140:
URL: https://github.com/apache/flink/pull/24140#discussion_r1468850983


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Rule to convert complex aggregation functions into simpler ones. Have a 
look at
+ * [[AggregateReduceFunctionsRule]] for details.
+ */
+public class WindowAggregateReduceFunctionsRule extends 
AggregateReduceFunctionsRule {
+private static final RelBuilderFactory 
LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE =
+RelBuilder.proto(
+Contexts.of(
+RelFactories.DEFAULT_STRUCT,
+
RelBuilder.Config.DEFAULT.withPruneInputOfAggregate(false)));
+
+public static final WindowAggregateReduceFunctionsRule
+WINDOW_AGGREGATE_REDUCE_FUNCTIONS_INSTANCE =
+new WindowAggregateReduceFunctionsRule(
+Config.DEFAULT
+
.withRelBuilderFactory(LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE)
+.withOperandSupplier(
+b ->
+
b.operand(LogicalWindowAggregate.class)
+.anyInputs())
+.as(Config.class));
+
+protected WindowAggregateReduceFunctionsRule(Config config) {
+super(config);
+}
+
+protected void newAggregateRel(

Review Comment:
   @ Override ?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala:
##
@@ -268,7 +268,7 @@ object FlinkBatchRuleSets {
 
 // reduce aggregate functions like AVG, STDDEV_POP etc.
 CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
-WindowAggregateReduceFunctionsRule.INSTANCE,
+
WindowAggregateReduceFunctionsRule.WINDOW_AGGREGATE_REDUCE_FUNCTIONS_INSTANCE,

Review Comment:
   Would you like share your thoughts about using the new static instance name? 
The new name contains redundant information that was already offered by the 
class name.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
+
+import org.apache.calcite.plan.Contexts;

Re: [PR] [FLINK-34222][table] End-to-end implementation of minibatch join [flink]

2024-01-28 Thread via GitHub


xishuaidelin commented on PR #24161:
URL: https://github.com/apache/flink/pull/24161#issuecomment-1913572174

   > @xishuaidelin The failure case is an known issue 
(https://issues.apache.org/jira/browse/FLINK-34206), you can rebase the latest 
master and rerun the tests.
   
   @lincoln-lil  Thanks for your kindly reminder!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34222][table] End-to-end implementation of minibatch join [flink]

2024-01-28 Thread via GitHub


xishuaidelin commented on PR #24161:
URL: https://github.com/apache/flink/pull/24161#issuecomment-1913571547

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34157][table] Migrate FlinkLimit0RemoveRule to java [flink]

2024-01-28 Thread via GitHub


JingGe commented on code in PR #24139:
URL: https://github.com/apache/flink/pull/24139#discussion_r1468823507


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRule.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+import org.immutables.value.Value;
+
+/** Planner rule that rewrites `limit 0` to empty 
[[org.apache.calcite.rel.core.Values]]. */
+@Value.Enclosing
+public class FlinkLimit0RemoveRule
+extends RelRule {

Review Comment:
   It might be good to described the reason of changing the extension to 
RelRule in the PR description section that RelOptRule will be deprecated in 
Calcite 2.0 and we should start to replace RelOptRule with RelRule. This will 
help the reviewers get the context, since there are still many RelOptRule*** 
related code used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34157][table] Migrate FlinkLimit0RemoveRule to java [flink]

2024-01-28 Thread via GitHub


JingGe commented on code in PR #24139:
URL: https://github.com/apache/flink/pull/24139#discussion_r1468823507


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRule.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+import org.immutables.value.Value;
+
+/** Planner rule that rewrites `limit 0` to empty 
[[org.apache.calcite.rel.core.Values]]. */
+@Value.Enclosing
+public class FlinkLimit0RemoveRule
+extends RelRule {

Review Comment:
   It might be good to described the reason of changing the extension to 
RelRule in the PR description section that RelOptRule will be deprecated in 
Calcite 2.0 and we should start to replace RelOptRule with RelRule. This will 
help the reviewers get the context, since there are still many RelRuleOpt*** 
related code used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34157][table] Migrate FlinkLimit0RemoveRule to java [flink]

2024-01-28 Thread via GitHub


JingGe commented on code in PR #24139:
URL: https://github.com/apache/flink/pull/24139#discussion_r1468823507


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRule.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+import org.immutables.value.Value;
+
+/** Planner rule that rewrites `limit 0` to empty 
[[org.apache.calcite.rel.core.Values]]. */
+@Value.Enclosing
+public class FlinkLimit0RemoveRule
+extends RelRule {

Review Comment:
   It might be good to described the reason of changing the extension to 
RelRule in the PR description section that RelOptRule will be deprecated 
eventually in Calcite 2.0 and we should start to replace RelOptRule with 
RelRule. This will help the reviewers get the context, since there are still 
many RelRuleOpt*** related code used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34157][table] Migrate FlinkLimit0RemoveRule to java [flink]

2024-01-28 Thread via GitHub


JingGe commented on code in PR #24139:
URL: https://github.com/apache/flink/pull/24139#discussion_r1468823507


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRule.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+import org.immutables.value.Value;
+
+/** Planner rule that rewrites `limit 0` to empty 
[[org.apache.calcite.rel.core.Values]]. */
+@Value.Enclosing
+public class FlinkLimit0RemoveRule
+extends RelRule {

Review Comment:
   It might be good to described the reason of changing the extension to 
RelRule that RelOptRule will be deprecated eventually in Calcite 2.0 and we 
should start to replace RelOptRule with RelRule. This will help the reviewers 
get the context, since there are still many RelRuleOpt*** related code used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34157][table] Migrate FlinkLimit0RemoveRule to java [flink]

2024-01-28 Thread via GitHub


JingGe commented on code in PR #24139:
URL: https://github.com/apache/flink/pull/24139#discussion_r1468823507


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRule.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+import org.immutables.value.Value;
+
+/** Planner rule that rewrites `limit 0` to empty 
[[org.apache.calcite.rel.core.Values]]. */
+@Value.Enclosing
+public class FlinkLimit0RemoveRule
+extends RelRule {

Review Comment:
   It might be good to described the reason of extending RelRule that 
RelOptRule will be deprecated eventually in Calcite 2.0 and we should start to 
replace RelOptRule with RelRule. This will help the reviewers get the context, 
since there are still many RelRuleOpt*** related code used.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRule.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+import org.immutables.value.Value;
+
+/** Planner rule that rewrites `limit 0` to empty 
[[org.apache.calcite.rel.core.Values]]. */
+@Value.Enclosing
+public class FlinkLimit0RemoveRule
+extends RelRule {
+
+public static final FlinkLimit0RemoveRule INSTANCE =
+FlinkLimit0RemoveRule.FlinkLimit0RemoveRuleConfig.DEFAULT.toRule();
+
+public FlinkLimit0RemoveRule(FlinkLimit0RemoveRuleConfig config) {

Review Comment:
   Does it make sense to limit the visibility to be `private`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


xuyangzhong commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468816825


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##
@@ -1527,11 +1605,37 @@ public String eval(Integer arg1, Integer arg2) {
 @ArgumentHint(name = "in1", type = 
@DataTypeHint("string")),
 @ArgumentHint(name = "in2", type = @DataTypeHint("string"))
 })
+public String eval(String arg1, String arg2) {
+return (arg1 + ":" + arg2);

Review Comment:
   Nit: add a `space` after `:`



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java:
##
@@ -67,27 +70,50 @@ public OperatorBindingCallContext(
 new AbstractList() {
 @Override
 public DataType get(int pos) {
+RelDataType relDataType;
 if (binding instanceof SqlCallBinding) {
 SqlCallBinding sqlCallBinding = (SqlCallBinding) 
binding;
-List operands = sqlCallBinding.operands();
-final RelDataType relDataType =
-sqlCallBinding
-.getValidator()
-.deriveType(
-sqlCallBinding.getScope(), 
operands.get(pos));
-final LogicalType logicalType =
-
FlinkTypeFactory.toLogicalType(relDataType);
-return 
TypeConversions.fromLogicalToDataType(logicalType);
+SqlNode sqlNode = 
sqlCallBinding.operands().get(pos);
+if (sqlNode.getKind() == SqlKind.DEFAULT) {
+RelDataType[] defaultTypes =
+FlinkRexUtil.extractDefaultTypes(
+sqlCallBinding.getOperator(),
+
sqlCallBinding.getTypeFactory());
+relDataType = defaultTypes[pos];
+} else {
+relDataType =
+sqlCallBinding
+.getValidator()
+
.deriveType(sqlCallBinding.getScope(), sqlNode);
+}
+} else if (binding instanceof RexCallBinding) {
+RexCallBinding rexCallBinding = (RexCallBinding) 
binding;
+RexNode rexNode = 
rexCallBinding.operands().get(pos);

Review Comment:
   ditto



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##
@@ -69,6 +72,10 @@ public SqlRexConvertlet get(SqlCall call) {
 return this::convertSetSemanticsWindowTableFunction;
 }
 
+if (isContainsDefaultNode(call)) {

Review Comment:
   Nit: just a little curious, can we correct the type of `default` and replace 
the RexCall, and then it is not necessary to do the same logic in 
`CallBindingCallContext` and `OperatorBindingCallContext` again and again...



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##
@@ -202,4 +215,24 @@ private static int parseFieldIdx(RexNode e) {
 // should not happen
 throw new TableException("Unsupported partition key with type: " + 
e.getKind());
 }
+
+private RexNode convertSqlCallWithDefaultNode(SqlRexContext cx, final 
SqlCall call) {

Review Comment:
   Nit: could you add some comments here?



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java:
##
@@ -126,19 +129,29 @@ public Operation convertSqlNode(SqlNode sqlNode, 
ConvertContext context) {
 typeInferResult.getOutputDataType());
 }
 
-private List reduceOperands(List sqlNodes, 
ConvertContext context) {
+private List reduceOperands(SqlCallBinding sqlCallBinding, 
ConvertContext context) {
 // we don't really care about the input row type while converting to 
RexNode
 // since call procedure shouldn't refer any inputs.
 // so, construct an empty row for it.
 RelDataType inputRowType =
-LogicalRelDataTypeConverter.toRelDataType(
+toRelDataType(
 DataTypes.ROW().getLogicalType(),
 context.getSqlValidator().getTypeFactory());
 List rexNodes = new ArrayList<>();
-

Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


fsk119 commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468808120


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java:
##
@@ -46,6 +46,8 @@ public final class TypeInference {
 
 private final @Nullable List namedArguments;
 
+private final @Nullable List optionalArguments;

Review Comment:
   I am worried about the current design of the Function Overload and Optional 
Parameter.
   
   The current implementation suggests that a UDF can only use either function 
overloading or optional parameters, not both.  But in some situation, we may 
need both abilities.
   
   ```
   class ToBytes extends ScalarFunction {
   
   bytes[] eval(String input, String format ="gzip") {}
   
   bytes[] eval(int input, String format = "gzip") {}
   }
   ```
   
   Actually, I think the argument optionals and names(alias) should belong to 
the `InputTypeStrategy`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34123) Introduce built-in serializers for common composited data types

2024-01-28 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811651#comment-17811651
 ] 

Zhanghao Chen commented on FLINK-34123:
---

[~zjureel] May I take this? I'll start working on it after code freeze ends.

> Introduce built-in serializers for common composited data types
> ---
>
> Key: FLINK-34123
> URL: https://issues.apache.org/jira/browse/FLINK-34123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Reporter: Zhanghao Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34123) Introduce built-in serializers for common composited data types

2024-01-28 Thread Zhanghao Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-34123:
--
Affects Version/s: 1.19.0

> Introduce built-in serializers for common composited data types
> ---
>
> Key: FLINK-34123
> URL: https://issues.apache.org/jira/browse/FLINK-34123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.19.0
>Reporter: Zhanghao Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34239) Introduce a deep copy method of SerializerConfig for merging with Table configs in org.apache.flink.table.catalog.DataTypeFactoryImpl

2024-01-28 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811650#comment-17811650
 ] 

Zhanghao Chen commented on FLINK-34239:
---

[~mallikarjuna] Welcome to the community! Go ahead~ [~zjureel] Could you help 
assign it to Kumar?

> Introduce a deep copy method of SerializerConfig for merging with Table 
> configs in org.apache.flink.table.catalog.DataTypeFactoryImpl 
> --
>
> Key: FLINK-34239
> URL: https://issues.apache.org/jira/browse/FLINK-34239
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Zhanghao Chen
>Priority: Major
>
> *Problem*
> Currently, 
> org.apache.flink.table.catalog.DataTypeFactoryImpl#createSerializerExecutionConfig
>  will create a deep-copy of the SerializerConfig and merge Table config into 
> it. However, the deep copy is done by manully calling the getter and setter 
> methods of SerializerConfig, and is prone to human errors, e.g. missing 
> copying a newly added field in SerializerConfig.
> *Proposal*
> Introduce a deep copy method for SerializerConfig and replace the curr impl 
> in 
> org.apache.flink.table.catalog.DataTypeFactoryImpl#createSerializerExecutionConfig.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34058][table] Support optional parameters for named parameters [flink]

2024-01-28 Thread via GitHub


fsk119 commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1468800471


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java:
##
@@ -44,19 +45,24 @@ final class FunctionSignatureTemplate {
 
 final @Nullable String[] argumentNames;
 
+final @Nullable Boolean[] argumentOptionals;

Review Comment:
   Please also modify the hashcode and equals method when add a new member 
field.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##
@@ -323,6 +327,37 @@ private void verifyMappingForMethod(
 verification.verify(method, signature.toClass(), 
result.toClass()));
 }
 
+private void verifyOptionalOnPrimitiveParameter(
+Method method,
+Map 
collectedMappingsPerMethod) {
+
+collectedMappingsPerMethod.forEach(
+(signature, result) -> {
+Boolean[] argumentOptional = signature.argumentOptionals;
+if (argumentOptional != null
+&& 
Arrays.stream(argumentOptional).anyMatch(Boolean::booleanValue)) {
+// do something check
+FunctionSignatureTemplate functionResultTemplate =
+signatureExtraction.extract(this, method);
+for (int i = 0; i < argumentOptional.length; i++) {
+Class converionClass =
+functionResultTemplate
+.argumentTemplates
+.get(i)
+.dataType
+.getConversionClass();
+if (argumentOptional[i]
+&& converionClass != null
+&& converionClass.isPrimitive()) {
+throw extractionError(
+"Argument at position %d is optional 
but a primitive type parameter.",

Review Comment:
   Argument at position %d is optional but a primitive type doesn't accept null 
value.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##
@@ -323,6 +327,37 @@ private void verifyMappingForMethod(
 verification.verify(method, signature.toClass(), 
result.toClass()));
 }
 
+private void verifyOptionalOnPrimitiveParameter(
+Method method,
+Map 
collectedMappingsPerMethod) {
+

Review Comment:
   nit: remove this empty line.
   
   It's better we can keep the same code style.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##
@@ -323,6 +327,37 @@ private void verifyMappingForMethod(
 verification.verify(method, signature.toClass(), 
result.toClass()));
 }
 
+private void verifyOptionalOnPrimitiveParameter(
+Method method,
+Map 
collectedMappingsPerMethod) {
+
+collectedMappingsPerMethod.forEach(
+(signature, result) -> {
+Boolean[] argumentOptional = signature.argumentOptionals;
+if (argumentOptional != null
+&& 
Arrays.stream(argumentOptional).anyMatch(Boolean::booleanValue)) {
+// do something check
+FunctionSignatureTemplate functionResultTemplate =

Review Comment:
   why don't use `signature.argumentTemplates` that has data type for every 
argument? If you don't have any concern, I suggest to move this part into the 
`FunctionTemplate` instead.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java:
##
@@ -194,14 +194,17 @@ private static  T defaultAsNull(
 "Argument and input hints cannot be declared in the same 
function hint.");
 }
 
+Boolean[] argumentOptionals = null;
 if (argumentHints != null) {
 argumentHintNames = new String[argumentHints.length];
 argumentHintTypes = new DataTypeHint[argumentHints.length];
+argumentOptionals = new Boolean[argumentHints.length];

Review Comment:
   Is var-length variable optional by default?



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java:
##
@@ -1286,4 +1287,6 @@ public List 
getAuxiliaryFunctions() {
 .operandTypeChecker(OperandTypes.NILADIC)
 .notDeterministic()
 .build();
+
+public static final SqlSpecialOperator DEFAULT 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-28 Thread Yang LI (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811646#comment-17811646
 ] 

Yang LI commented on FLINK-33545:
-

Hello,

If I understand correctly, [~mason6345] 's proposal doesn't necessarily imply 
that we need to track each exact pending record. Instead, we can continue 
utilizing [~aeolus811tw] 's variable to monitor pending records. If this 
variable remains true after the first flush, rather than attempting a second 
flush, we should consider failing the checkpoint. This could compel Flink to 
replay from the last checkpoint.

Thanks

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-28 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811642#comment-17811642
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi [~mason6345] 

I'm not sure that's possible without going deeper into KafkaProducer code.

KafkaProducer is only a facade to the actual buffer that holds and sends record 
via RecordAccumulator class, it then performs a blocking wait on the iterative 
request result of which doesn't really give us any insight into how many 
records have been synced successfully. To achieve what is described on tracking 
exact pending record will require us to go deeper into the Kafka internal 
classes, which might make this more prone to issues

as per my PR that i have done the following:
 # use variable to track whether there's pending record
 # if there is and Guarantee is AT_LEAST_ONCE, we perform second commit / 
follow EXACTLY_ONCE path to trigger the commit on producer
 # if there's any failure, it should throw error immediately as per 
EXACTLY_ONCE route

and since step 3 is the final step before flink checkpoint is committed, if 
this happens the retry logic should take place as you described

Do let me know if this implementation satisfies the proposal: [~yang] 
[~tzulitai] [~martijnvisser] 

Thanks

 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback 

[jira] [Closed] (FLINK-34245) CassandraSinkTest.test_cassandra_sink fails under JDK17 and JDK21 due to InaccessibleObjectException

2024-01-28 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-34245.
---
Fix Version/s: 1.19.0
 Assignee: Junrui Li
   Resolution: Fixed

Fixed via ddbf87f2a7aeeeb20a8590578c6d037b239d5593

> CassandraSinkTest.test_cassandra_sink fails under JDK17 and JDK21 due to 
> InaccessibleObjectException
> 
>
> Key: FLINK-34245
> URL: https://issues.apache.org/jira/browse/FLINK-34245
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Connectors / Cassandra
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Junrui Li
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56942=logs=b53e1644-5cb4-5a3b-5d48-f523f39bcf06=b68c9f5c-04c9-5c75-3862-a3a27aabbce3]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56942=logs=60960eae-6f09-579e-371e-29814bdd1adc=7a70c083-6a74-5348-5106-30a76c29d8fa=63680]
> {code:java}
> Jan 26 01:29:27 E   py4j.protocol.Py4JJavaError: An error 
> occurred while calling 
> z:org.apache.flink.python.util.PythonConfigUtil.configPythonOperator.
> Jan 26 01:29:27 E   : 
> java.lang.reflect.InaccessibleObjectException: Unable to make field final 
> java.util.Map java.util.Collections$UnmodifiableMap.m accessible: module 
> java.base does not "opens java.util" to unnamed module @17695df3
> Jan 26 01:29:27 E at 
> java.base/java.lang.reflect.AccessibleObject.throwInaccessibleObjectException(AccessibleObject.java:391)
> Jan 26 01:29:27 E at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:367)
> Jan 26 01:29:27 E at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:315)
> Jan 26 01:29:27 E at 
> java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:183)
> Jan 26 01:29:27 E at 
> java.base/java.lang.reflect.Field.setAccessible(Field.java:177)
> Jan 26 01:29:27 E at 
> org.apache.flink.python.util.PythonConfigUtil.registerPythonBroadcastTransformationTranslator(PythonConfigUtil.java:357)
> Jan 26 01:29:27 E at 
> org.apache.flink.python.util.PythonConfigUtil.configPythonOperator(PythonConfigUtil.java:101)
> Jan 26 01:29:27 E at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> Jan 26 01:29:27 E at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> Jan 26 01:29:27 E at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> Jan 26 01:29:27 E at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> Jan 26 01:29:27 E at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> Jan 26 01:29:27 E at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> Jan 26 01:29:27 E at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> Jan 26 01:29:27 E at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> Jan 26 01:29:27 E at 
> java.base/java.lang.Thread.run(Thread.java:1583) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34245][python] Fix config retrieval logic from nested YAML in pyflink_gateway_server with flattened keys. [flink]

2024-01-28 Thread via GitHub


zhuzhurk closed pull request #24209: [FLINK-34245][python] Fix config retrieval 
logic from nested YAML in pyflink_gateway_server with flattened keys.
URL: https://github.com/apache/flink/pull/24209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org