[jira] [Comment Edited] (FLINK-25857) Add committer metrics to track the status of committables

2023-11-23 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789356#comment-17789356
 ] 

Weijie Guo edited comment on FLINK-25857 at 11/24/23 7:53 AM:
--

Thanks [~pvary] for the quick reply! 

Yes, In principle, it is indeed not required to bind the connector and flink 
versions. My main concern is that the changes seems without sufficient 
discussion (like drafting a FLIP, If there is indeed one, I'm sorry I missed 
it). In addition, {{FLIP-321: Introduce an API deprecation process}} seems to 
also have requirements for the deprecated process of `PublicEvolving` API.




was (Author: weijie guo):
Thanks [~pvary] for the quick reply! 

Yes, In principle, it is indeed not required to bind the connector and flink 
versions. My main concern is that the changes seems without sufficient 
discussion (like drafting a FLIP). In addition, {{FLIP-321: Introduce an API 
deprecation process}} seems to also have requirements for the deprecated 
process of `PublicEvolving` API.



> Add committer metrics to track the status of committables
> -
>
> Key: FLINK-25857
> URL: https://issues.apache.org/jira/browse/FLINK-25857
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Fabian Paul
>Assignee: Peter Vary
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png
>
>
> With Sink V2 we can now track the progress of a committable during committing 
> and show metrics about the committing status. (i.e. failed, retried, 
> succeeded).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25857) Add committer metrics to track the status of committables

2023-11-23 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789356#comment-17789356
 ] 

Weijie Guo commented on FLINK-25857:


Thanks [~pvary] for the quick reply! 

Yes, In principle, it is indeed not required to bind the connector and flink 
versions. My main concern is that the changes seems without sufficient 
discussion (like drafting a FLIP). In addition, {{FLIP-321: Introduce an API 
deprecation process}} seems to also have requirements for the deprecated 
process of `PublicEvolving` API.



> Add committer metrics to track the status of committables
> -
>
> Key: FLINK-25857
> URL: https://issues.apache.org/jira/browse/FLINK-25857
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Fabian Paul
>Assignee: Peter Vary
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png
>
>
> With Sink V2 we can now track the progress of a committable during committing 
> and show metrics about the committing status. (i.e. failed, retried, 
> succeeded).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-23 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789355#comment-17789355
 ] 

Rui Fan commented on FLINK-27681:
-

{quote}In our production, the underlying environment may produce some errors, 
resulting in corrupted files. In addition, Flink stores local files in the form 
of a single copy. When a problematic file is uploaded to DFS as a checkpoint, 
this checkpoint will be unavailable.
{quote}
Hi [~Ming Li] [~mayuehappy] , I did a quick review for this PR. But I still 
don't know why the file is corrupted, would you mind describing it in detail?

I'm worried that flink add check can't completely solve the problem of file 
corruption. Is it possible that file corruption occurs after flink check but 
before uploading the file to hdfs?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]

2023-11-23 Thread via GitHub


1996fanrui commented on code in PR #23765:
URL: https://github.com/apache/flink/pull/23765#discussion_r1404014532


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java:
##
@@ -86,6 +86,18 @@ public class RocksDBOptions {
 .withDescription(
 "The number of threads (per stateful operator) 
used to transfer (download and upload) files in RocksDBStateBackend.");
 
+/**
+ * Whether to verify the Checksum of the incremental sst file during 
Checkpoint in
+ * RocksDBStateBackend.
+ */
+@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+public static final ConfigOption 
CHECKPOINT_VERIFY_CHECKSUM_ENABLE =
+
ConfigOptions.key("state.backend.rocksdb.checkpoint.verify.checksum.enable")

Review Comment:
   ```suggestion
   
ConfigOptions.key("state.backend.rocksdb.checkpoint.verify.checksum.enabled")
   ```



##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksdbStateFileVerifierTest.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;

Review Comment:
   The new test should use `import org.junit.jupiter.api.Test;` instead of 
`org.junit.Test`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33613][python] Make sure gRPC server is shutdown gracefully if Python process startup failed [flink]

2023-11-23 Thread via GitHub


flinkbot commented on PR #23789:
URL: https://github.com/apache/flink/pull/23789#issuecomment-1825239372

   
   ## CI report:
   
   * 30acb658f852496d73028b93467c024f3f4d43a0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33613) Python UDF Runner process leak in Process Mode

2023-11-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-33613:
---

Assignee: Dian Fu

> Python UDF Runner process leak in Process Mode
> --
>
> Key: FLINK-33613
> URL: https://issues.apache.org/jira/browse/FLINK-33613
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.17.0
>Reporter: Yu Chen
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Attachments: ps-ef.txt, streaming_word_count-1.py
>
>
> While working with PyFlink, we found that in Process Mode, the Python UDF 
> process may leak after a failover of the job. It leads to a rising number of 
> processes with their threads in the host machine, which eventually results in 
> failure to create new threads.
>  
> You can try to reproduce it with the attached test task 
> `streamin_word_count.py`.
> (Note that the job will continue failover, and you can watch the process 
> leaks by `ps -ef` on Taskmanager.
>  
> Our test environment:
>  * K8S Application Mode
>  * 4 Taskmanagers with 12 slots/TM
>  * Job's parallelism was set to 48 
> The udf process `pyflink.fn_execution.beam.beam_boot` should be consistence 
> with slots of TM (12), but we found that there are 180 processes on one 
> Taskmanager after several failovers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33613) Python UDF Runner process leak in Process Mode

2023-11-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33613:
---
Labels: pull-request-available  (was: )

> Python UDF Runner process leak in Process Mode
> --
>
> Key: FLINK-33613
> URL: https://issues.apache.org/jira/browse/FLINK-33613
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.17.0
>Reporter: Yu Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: ps-ef.txt, streaming_word_count-1.py
>
>
> While working with PyFlink, we found that in Process Mode, the Python UDF 
> process may leak after a failover of the job. It leads to a rising number of 
> processes with their threads in the host machine, which eventually results in 
> failure to create new threads.
>  
> You can try to reproduce it with the attached test task 
> `streamin_word_count.py`.
> (Note that the job will continue failover, and you can watch the process 
> leaks by `ps -ef` on Taskmanager.
>  
> Our test environment:
>  * K8S Application Mode
>  * 4 Taskmanagers with 12 slots/TM
>  * Job's parallelism was set to 48 
> The udf process `pyflink.fn_execution.beam.beam_boot` should be consistence 
> with slots of TM (12), but we found that there are 180 processes on one 
> Taskmanager after several failovers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33613][python] Make sure gRPC server is shutdown gracefully if Python process startup failed [flink]

2023-11-23 Thread via GitHub


dianfu opened a new pull request, #23789:
URL: https://github.com/apache/flink/pull/23789

   
   ## What is the purpose of the change
   
   *This pull request fix the issue that the Java gRPC server was not shutdown 
properly during job cancelling which may cause the Python process not shutdown 
properly.*
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DOC]Update watermark column type [flink]

2023-11-23 Thread via GitHub


flinkbot commented on PR #23788:
URL: https://github.com/apache/flink/pull/23788#issuecomment-1825226910

   
   ## CI report:
   
   * cfb92fb619cfdbaa4b610d4c95dae57922afec18 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31385) Introduce extended Assertj Matchers for completable futures

2023-11-23 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789347#comment-17789347
 ] 

Yun Tang commented on FLINK-31385:
--

Also picked in release-1.17 2a83c910eee711f8b5f9dd4697de60221f21fb9d for the 
pick of FLINK-33598.

> Introduce extended Assertj Matchers for completable futures
> ---
>
> Key: FLINK-31385
> URL: https://issues.apache.org/jira/browse/FLINK-31385
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.3
>
>
> Introduce extended Assertj Matchers for completable futures that don't rely 
> on timeouts.
> In general, we want to avoid relying on timeouts in the Flink test suite to 
> get additional context (thread dump) in case something gets stuck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [DOC]Update watermark column type [flink]

2023-11-23 Thread via GitHub


hehuiyuan opened a new pull request, #23788:
URL: https://github.com/apache/flink/pull/23788

   ## What is the purpose of the change
   
   
   
![image](https://github.com/apache/flink/assets/18002496/9e1e478b-84ac-4930-9a42-6bb91616a3f2)
   
   
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/create/#watermark
   
   Add  rowtime colunm type `TIMESTAMP_LTZ(3)` 
   
   ## Brief change log
   
   
   Add  rowtime colunm type `TIMESTAMP_LTZ(3)` 
   
   ## Verifying this change
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31385) Introduce extended Assertj Matchers for completable futures

2023-11-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-31385:
-
Fix Version/s: 1.17.3

> Introduce extended Assertj Matchers for completable futures
> ---
>
> Key: FLINK-31385
> URL: https://issues.apache.org/jira/browse/FLINK-31385
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.3
>
>
> Introduce extended Assertj Matchers for completable futures that don't rely 
> on timeouts.
> In general, we want to avoid relying on timeouts in the Flink test suite to 
> get additional context (thread dump) in case something gets stuck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33598) Watch HA configmap via name instead of lables to reduce pressure on APIserver

2023-11-23 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang resolved FLINK-33598.
--
Resolution: Fixed

Merged
master: 608546e090f5d41c6a8b9af2c264467279181027 ... 
b7e8b792c086c3c445ee8429fbcfe035097a878c

release-1.18: 6f30c6e427251dd4b2e4ad03f89bed06a519b05f
release-1.17: 18d5a4696eccac3b5e7fe1d579547feef4537c08

> Watch HA configmap via name instead of lables to reduce pressure on APIserver 
> --
>
> Key: FLINK-33598
> URL: https://issues.apache.org/jira/browse/FLINK-33598
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.18.1, 1.17.3
>
>
> As FLINK-24819 described, the k8s API server would receive more pressure when 
> HA is enabled, due to the configmap watching being achieved via filter with 
> labels instead of just querying the configmap name. This could be done after 
> FLINK-24038, which reduced the number of configmaps to only one as 
> {{-cluster-config-map}}.
> This ticket would not touch {{--config-map}}, which stores 
> the checkpoint information, as that configmap is directly accessed by JM and 
> not watched by taskmanagers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33338) Bump FRocksDB version

2023-11-23 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789345#comment-17789345
 ] 

Yue Ma commented on FLINK-8:


[~pnowojski] Update:  Now All the changes we need are released in 
{*}8.9.0(11/17/203) 
[HISTORY.md|{*}{*}https://github.com/facebook/rocksdb/blob/main/HISTORY.md{*}{*}]{*}

> Bump FRocksDB version
> -
>
> Key: FLINK-8
> URL: https://issues.apache.org/jira/browse/FLINK-8
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Piotr Nowojski
>Priority: Major
>
> We need to bump RocksDB in order to be able to use new IngestDB and ClipDB 
> commands.
> If some of the required changes haven't been merged to Facebook/RocksDB, we 
> should cherry-pick and include them in our FRocksDB fork.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33338) Bump FRocksDB version

2023-11-23 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789345#comment-17789345
 ] 

Yue Ma edited comment on FLINK-8 at 11/24/23 6:49 AM:
--

[~pnowojski] Update:  Now All the changes we need are released in 
*8.9.0(11/17/203) 
[HISTORY.md|https://github.com/facebook/rocksdb/blob/main/HISTORY.md]*


was (Author: mayuehappy):
[~pnowojski] Update:  Now All the changes we need are released in 
{*}8.9.0(11/17/203) 
[HISTORY.md|{*}{*}https://github.com/facebook/rocksdb/blob/main/HISTORY.md{*}{*}]{*}

> Bump FRocksDB version
> -
>
> Key: FLINK-8
> URL: https://issues.apache.org/jira/browse/FLINK-8
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Piotr Nowojski
>Priority: Major
>
> We need to bump RocksDB in order to be able to use new IngestDB and ClipDB 
> commands.
> If some of the required changes haven't been merged to Facebook/RocksDB, we 
> should cherry-pick and include them in our FRocksDB fork.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25857) Add committer metrics to track the status of committables

2023-11-23 Thread Peter Vary (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789335#comment-17789335
 ] 

Peter Vary commented on FLINK-25857:


For the Iceberg connector we release separate versions for different versions 
of Flink.

If I understand correctly, this was the case before the separation of the 
connectors too - every Flink version contained different versions of 
connectors, and the jars might not work cross versions.

Also the `TwoPhaseCommittingSink` is marked with `PublicEvolving` annotation, 
which means it could change between minor versions of Flink. We push forward 
these changes, so soon the API could be changed to `Public`, and we can avoid 
these kind of disruptions in the future. 

> Add committer metrics to track the status of committables
> -
>
> Key: FLINK-25857
> URL: https://issues.apache.org/jira/browse/FLINK-25857
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Fabian Paul
>Assignee: Peter Vary
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png
>
>
> With Sink V2 we can now track the progress of a committable during committing 
> and show metrics about the committing status. (i.e. failed, retried, 
> succeeded).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement

2023-11-23 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789334#comment-17789334
 ] 

Benchao Li commented on FLINK-33541:


There is no such usage: "RAND(a, 10) and RAND(b, 10) ".

The usages for RAND is "RAND() and RAND(seed)" 

> RAND_INTEGER  can't be existed in a IF statement
> 
>
> Key: FLINK-33541
> URL: https://issues.apache.org/jira/browse/FLINK-33541
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-11-24-13-31-21-209.png
>
>
> The minimum produce steps:
> Flink SQL> select if(1=1, rand_integer(100), 0);
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL)
>  
> But we do not see the exception reported in 1.14, not sure which version this 
> bug was introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement

2023-11-23 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789330#comment-17789330
 ] 

xuyang commented on FLINK-33541:


Hi, [~libenchao] , I checked the code again and sorry for this mistake. It 
forbids CAST from nullable to not null in `LogicalTypeCasts#defaultMethod` and 
I think this is reasonable.

!image-2023-11-24-13-31-21-209.png|width=489,height=325!
{quote}In this case, the argument is literal which is not null, so the result 
type is not null is ok. And I don't see many usages to pass in a nullable field 
as argument.
{quote}
For example, we have a table with column "a int not null" and "b int". I think 
the result type with RAND(a, 10) and RAND(b, 10) is different. The former is 
int not null and the latter is int. This difference will and should also affect 
the result type about its output. 
{quote} Isn't this is the root cause?
{quote}
The inconsistence about the behavior for result type during validation and code 
generation in RAND and RAND_INTEGER causes this bug.

 

Look for your thoughts : )

> RAND_INTEGER  can't be existed in a IF statement
> 
>
> Key: FLINK-33541
> URL: https://issues.apache.org/jira/browse/FLINK-33541
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-11-24-13-31-21-209.png
>
>
> The minimum produce steps:
> Flink SQL> select if(1=1, rand_integer(100), 0);
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL)
>  
> But we do not see the exception reported in 1.14, not sure which version this 
> bug was introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement

2023-11-23 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-33541:
---
Attachment: image-2023-11-24-13-31-21-209.png

> RAND_INTEGER  can't be existed in a IF statement
> 
>
> Key: FLINK-33541
> URL: https://issues.apache.org/jira/browse/FLINK-33541
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-11-24-13-31-21-209.png
>
>
> The minimum produce steps:
> Flink SQL> select if(1=1, rand_integer(100), 0);
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL)
>  
> But we do not see the exception reported in 1.14, not sure which version this 
> bug was introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33565) The concurrentExceptions doesn't work

2023-11-23 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789329#comment-17789329
 ] 

Rui Fan commented on FLINK-33565:
-

Give some background here: 
[FLIP-364|https://cwiki.apache.org/confluence/x/uJqzDw] is improving the 
restart-strategy. Currently, one pain point of restart-strategy is the 
restart-attempt is increased too fast (Read more details from FLIP-364 1.2 
part).

The core solution is similar and related to concurrentExceptions: it's that we 
merge some exceptions into one restart-attempt.

[~zhuzh]  and me would the restart-attempt is matched with the 
concurrentExceptions, when we merge some exceptions into one restart-attempt: 
we pick the first exception as the root cause, the rest of exceptions as the 
concurrent exceptions.

[~mapohl] WDYT?

> The concurrentExceptions doesn't work
> -
>
> Key: FLINK-33565
> URL: https://issues.apache.org/jira/browse/FLINK-33565
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> First of all, thanks to [~mapohl] for helping double-check in advance that 
> this was indeed a bug .
> Displaying exception history in WebUI is supported in FLINK-6042.
> h1. What's the concurrentExceptions?
> When an execution fails due to an exception, other executions in the same 
> region will also restart, and the first Exception is rootException. If other 
> restarted executions also report Exception at this time, we hope to collect 
> these exceptions and Displayed to the user as concurrentExceptions.
> h2. What's this bug?
> The concurrentExceptions is always empty in production, even if other 
> executions report exception at very close times.
> h1. Why doesn't it work?
> If one job has all-to-all shuffle, this job only has one region, and this 
> region has a lot of executions. If one execution throw exception:
>  * JobMaster will mark the state as FAILED for this execution.
>  * The rest of executions of this region will be marked to CANCELING.
>  ** This call stack can be found at FLIP-364 
> [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover]
>  
> When these executions throw exception as well, it JobMaster will mark the 
> state from CANCELING to CANCELED instead of FAILED.
> The CANCELED execution won't call FAILED logic, so their exceptions are 
> ignored.
> Note: all reports are executed inside of JobMaster RPC thread, it's single 
> thread. So these reports are executed serially. So only one execution is 
> marked to FAILED, and the rest of executions will be marked to CANCELED later.
> h1. How to fix it?
> Offline discuss with [~mapohl] , we need to discuss with community should we 
> keep the concurrentExceptions first.
>  * If no, we can remove related logic directly
>  * If yew, we discuss how to fix it later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33490) Validate the name conflicts when creating view

2023-11-23 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789326#comment-17789326
 ] 

Benchao Li commented on FLINK-33490:


I agree mostly, I don't have a strong opinion whether to have a FLIP, I'd like 
to hear more from others.

> Validate the name conflicts when creating view
> --
>
> Key: FLINK-33490
> URL: https://issues.apache.org/jira/browse/FLINK-33490
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> We should forbid 
> ```
> CREATE VIEW id_view AS
> SELECT id, uid AS id FROM id_table
> ```
> As the SQL standards states,
> If  is specified, then:
> i) If any two columns in the table specified by the  have 
> equivalent s, or if any column of that table has an 
> implementation-dependent name, then a  shall be specified.
> ii) Equivalent s shall not be specified more than once in the 
> .
> Many databases also throw exception when view name conflicts, e.g. mysql, 
> postgres.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement

2023-11-23 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789325#comment-17789325
 ] 

Benchao Li commented on FLINK-33541:


{quote}I notice currently the CAST rule is correct. It forbids casting from not 
null to nullable. The root cause is not IF but is RAND_INTEGER
{quote}
I don't agree on this, casting not null to nullable should be valid. The 
opposite should be invalid.
{quote}Actually, the result type of RAND and RAND_INTEGER should depend on the 
types of arguments.
{quote}
In this case, the argument is literal which is not null, so the result type is 
not null is ok. And I don't see many usages to pass in a nullable field as 
argument.

{quote}when codegen operator RAND, the return type of RAND(0) is always 
"nullable" in `RandCallGen`.
{quote}
Isn't this is the root cause?

> RAND_INTEGER  can't be existed in a IF statement
> 
>
> Key: FLINK-33541
> URL: https://issues.apache.org/jira/browse/FLINK-33541
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
>
> The minimum produce steps:
> Flink SQL> select if(1=1, rand_integer(100), 0);
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL)
>  
> But we do not see the exception reported in 1.14, not sure which version this 
> bug was introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33624) Bump Guava to 32.1.3-jre in flink-table

2023-11-23 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge resolved FLINK-33624.
-
Resolution: Fixed

> Bump Guava to 32.1.3-jre in flink-table
> ---
>
> Key: FLINK-33624
> URL: https://issues.apache.org/jira/browse/FLINK-33624
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33624) Bump Guava to 32.1.3-jre in flink-table

2023-11-23 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789324#comment-17789324
 ] 

Jing Ge commented on FLINK-33624:
-

master: 34620fc5c5698d00e64c6b15f8ce84f807a2e0d7

> Bump Guava to 32.1.3-jre in flink-table
> ---
>
> Key: FLINK-33624
> URL: https://issues.apache.org/jira/browse/FLINK-33624
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0

2023-11-23 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui closed FLINK-33547.
---
Resolution: Duplicate

Duplicated with https://issues.apache.org/jira/browse/FLINK-33523

> SQL primitive array type after upgrading to Flink 1.18.0
> 
>
> Key: FLINK-33547
> URL: https://issues.apache.org/jira/browse/FLINK-33547
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Xingcan Cui
>Priority: Major
>
> We have some Flink SQL UDFs that use object array (Object[]) arguments and 
> take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink 
> 1.18.0, the data created by ARRAY[] SQL function became primitive arrays 
> (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's 
> expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0

2023-11-23 Thread Xingcan Cui (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789319#comment-17789319
 ] 

Xingcan Cui commented on FLINK-33547:
-

Sure. I'll close this.

> SQL primitive array type after upgrading to Flink 1.18.0
> 
>
> Key: FLINK-33547
> URL: https://issues.apache.org/jira/browse/FLINK-33547
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Xingcan Cui
>Priority: Major
>
> We have some Flink SQL UDFs that use object array (Object[]) arguments and 
> take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink 
> 1.18.0, the data created by ARRAY[] SQL function became primitive arrays 
> (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's 
> expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0

2023-11-23 Thread Prabhu Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789317#comment-17789317
 ] 

Prabhu Joseph commented on FLINK-33547:
---

[~xccui]  Thanks for the details. This issue seems to be a duplicate of 
[FLINK-33523|https://issues.apache.org/jira/browse/FLINK-33523]. Shall we track 
this issue in FLINK-33523? Thanks.

> SQL primitive array type after upgrading to Flink 1.18.0
> 
>
> Key: FLINK-33547
> URL: https://issues.apache.org/jira/browse/FLINK-33547
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Xingcan Cui
>Priority: Major
>
> We have some Flink SQL UDFs that use object array (Object[]) arguments and 
> take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink 
> 1.18.0, the data created by ARRAY[] SQL function became primitive arrays 
> (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's 
> expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33523) DataType ARRAY fails to cast into Object[]

2023-11-23 Thread Prabhu Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789316#comment-17789316
 ] 

Prabhu Joseph commented on FLINK-33523:
---

Thanks [~jeyhun]. Flink 1.17 does not have this enforcement. Flink 1.18 
[FLINK-31835|https://issues.apache.org/jira/browse/FLINK-31835] has added the 
enforcement and has broken multiple places like [FLINK-33547 
|https://issues.apache.org/jira/browse/FLINK-33547] and [Iceberg test 
cases|https://github.com/apache/iceberg/issues/8930]. 

> DataType ARRAY fails to cast into Object[]
> 
>
> Key: FLINK-33523
> URL: https://issues.apache.org/jira/browse/FLINK-33523
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> When upgrading Iceberg's Flink version to 1.18, we found the Flink-related 
> unit test case broken due to this issue. The below code used to work fine in 
> Flink 1.17 but failed after upgrading to 1.18. DataType ARRAY 
> fails to cast into Object[].
> *Error:*
> {code}
> Exception in thread "main" java.lang.ClassCastException: [I cannot be cast to 
> [Ljava.lang.Object;
> at FlinkArrayIntNotNullTest.main(FlinkArrayIntNotNullTest.java:18)
> {code}
> *Repro:*
> {code}
>   import org.apache.flink.table.data.ArrayData;
>   import org.apache.flink.table.data.GenericArrayData;
>   import org.apache.flink.table.api.EnvironmentSettings;
>   import org.apache.flink.table.api.TableEnvironment;
>   import org.apache.flink.table.api.TableResult;
>   public class FlinkArrayIntNotNullTest {
> public static void main(String[] args) throws Exception {
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inBatchMode().build();
>   TableEnvironment env = TableEnvironment.create(settings);
>   env.executeSql("CREATE TABLE filesystemtable2 (id INT, data ARRAY NOT NULL>) WITH ('connector' = 'filesystem', 'path' = 
> '/tmp/FLINK/filesystemtable2', 'format'='json')");
>   env.executeSql("INSERT INTO filesystemtable2 VALUES (4,ARRAY [1,2,3])");
>   TableResult tableResult = env.executeSql("SELECT * from 
> filesystemtable2");
>   ArrayData actualArrayData = new GenericArrayData((Object[]) 
> tableResult.collect().next().getField(1));
> }
>   }
> {code}
> *Analysis:*
> 1. The code works fine with ARRAY datatype. The issue happens when using 
> ARRAY.
> 2. The code works fine when casting into int[] instead of Object[].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33358][sql] Fix Flink SQL Client fail to start in Flink on YARN [flink]

2023-11-23 Thread via GitHub


PrabhuJoseph commented on PR #23629:
URL: https://github.com/apache/flink/pull/23629#issuecomment-1825118817

   @1996fanrui Could you help in committing this change. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33472) Solve the problem that the temporary file of flink-conf.yaml in S3AFileSystem cannot be uploaded

2023-11-23 Thread zhengzhili (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789000#comment-17789000
 ] 

zhengzhili edited comment on FLINK-33472 at 11/24/23 3:40 AM:
--

[~mapohl]   this is my pull request 
[https://github.com/apache/flink/pull/23782] , could you please have a look? 


was (Author: JIRAUSER302860):
https://github.com/apache/flink/pull/23782

> Solve the problem that the temporary file of flink-conf.yaml in S3AFileSystem 
> cannot be uploaded
> 
>
> Key: FLINK-33472
> URL: https://issues.apache.org/jira/browse/FLINK-33472
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.1
>Reporter: zhengzhili
>Assignee: zhengzhili
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2023-11-16-09-56-40-806.png
>
>
> Solve the problem that the temporary file of flink-conf.yaml in S3AFileSystem 
> cannot be uploaded。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0

2023-11-23 Thread Xingcan Cui (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789311#comment-17789311
 ] 

Xingcan Cui commented on FLINK-33547:
-

Hi [~jeyhun] , thanks for your attention!

What you explained makes sense. However, sometimes it's tricky to deal with 
primitive array parameters. They make it harder for users to write generic 
UDFs, e.g., one that takes an arbitrary array and returns the first 3 elements. 
Also, this is a breaking change in 1.18.0. All the old UDFs using Object[] as 
arguments before can't directly work for primitive arrays generated from ARRAY 
functions now.

Type inference and dealing with null values are challenging in Flink SQL. Users 
won't understand why a UDF accepting an ARRAY argument can't work for an 
ARRAY parameter. It's also impossible for UDF developers to code 
a bunch of functions taking different primitive arrays. We need some changes 
here.

> SQL primitive array type after upgrading to Flink 1.18.0
> 
>
> Key: FLINK-33547
> URL: https://issues.apache.org/jira/browse/FLINK-33547
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Xingcan Cui
>Priority: Major
>
> We have some Flink SQL UDFs that use object array (Object[]) arguments and 
> take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink 
> 1.18.0, the data created by ARRAY[] SQL function became primitive arrays 
> (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's 
> expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]

2023-11-23 Thread via GitHub


mayuehappy commented on PR #23765:
URL: https://github.com/apache/flink/pull/23765#issuecomment-1825104163

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]

2023-11-23 Thread via GitHub


mayuehappy commented on PR #23765:
URL: https://github.com/apache/flink/pull/23765#issuecomment-1825103757

   @masteryhx Thanks for the code review , I have updated the MR , PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25857) Add committer metrics to track the status of committables

2023-11-23 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789304#comment-17789304
 ] 

Weijie Guo commented on FLINK-25857:


IIUC, this will cause our external connector sink to be unable to support 
flink-1.19 and versions less than 1.19 at the same time.

> Add committer metrics to track the status of committables
> -
>
> Key: FLINK-25857
> URL: https://issues.apache.org/jira/browse/FLINK-25857
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Fabian Paul
>Assignee: Peter Vary
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png
>
>
> With Sink V2 we can now track the progress of a committable during committing 
> and show metrics about the committing status. (i.e. failed, retried, 
> succeeded).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33635:
---
Priority: Blocker  (was: Major)

> Some connectors can not compile in 1.19-SNAPSHOT
> 
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Blocker
>
> The sink API compatibility was broken in FLINK-25857. 
> org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was 
> changed to 
> org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext).
> All external connectors sink can not compile as this change.
> For example:
> es: 
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421
> aws: 
> https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33636) Implement JdbcAutoScalerEventHandler

2023-11-23 Thread Rui Fan (Jira)
Rui Fan created FLINK-33636:
---

 Summary: Implement JdbcAutoScalerEventHandler
 Key: FLINK-33636
 URL: https://issues.apache.org/jira/browse/FLINK-33636
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler
Reporter: Rui Fan
Assignee: Rui Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789301#comment-17789301
 ] 

Jiabao Sun commented on FLINK-33635:


mongodb: 
https://github.com/apache/flink-connector-mongodb/actions/runs/6917152935/job/18818013536

> Some connectors can not compile in 1.19-SNAPSHOT
> 
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> The sink API compatibility was broken in FLINK-25857. 
> org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was 
> changed to 
> org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext).
> All external connectors sink can not compile as this change.
> For example:
> es: 
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421
> aws: 
> https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][doc] add repo links of externalized connectors [flink]

2023-11-23 Thread via GitHub


flinkbot commented on PR #23787:
URL: https://github.com/apache/flink/pull/23787#issuecomment-1825089599

   
   ## CI report:
   
   * ebda3f5e2e64929e223823778ca6626435eed26b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Yuxin Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789300#comment-17789300
 ] 

Yuxin Tan commented on FLINK-33635:
---

The failed reason is that the APIs are changed by 
[FLINK-25857|https://issues.apache.org/jira/browse/FLINK-25857]. This change 
may affect all connector's implementation. I think we should fix the issue with 
high priority.

> Some connectors can not compile in 1.19-SNAPSHOT
> 
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> The sink API compatibility was broken in FLINK-25857. 
> org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was 
> changed to 
> org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext).
> All external connectors sink can not compile as this change.
> For example:
> es: 
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421
> aws: 
> https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33635:
---
Description: 
The sink API compatibility was broken in FLINK-25857. 

org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was changed 
to org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext).

All external connectors sink can not compile as this change.

For example:

es: 
https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421

aws: 
https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160

  
was:https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421


> Some connector can not compile in 1.19-SNAPSHOT
> ---
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> The sink API compatibility was broken in FLINK-25857. 
> org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was 
> changed to 
> org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext).
> All external connectors sink can not compile as this change.
> For example:
> es: 
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421
> aws: 
> https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33635:
---
Summary: Some connectors can not compile in 1.19-SNAPSHOT  (was: Some 
connector can not compile in 1.19-SNAPSHOT)

> Some connectors can not compile in 1.19-SNAPSHOT
> 
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> The sink API compatibility was broken in FLINK-25857. 
> org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was 
> changed to 
> org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext).
> All external connectors sink can not compile as this change.
> For example:
> es: 
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421
> aws: 
> https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33581][core] Deprecate configuration getters/setters that return/set complex Java objects. [flink]

2023-11-23 Thread via GitHub


JunRuiLee commented on code in PR #23758:
URL: https://github.com/apache/flink/pull/23758#discussion_r1403852951


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -676,8 +690,16 @@ public StreamExecutionEnvironment 
setStateBackend(StateBackend backend) {
 /**
  * Gets the state backend that defines how to store and checkpoint state.
  *
+ * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+ * all complex Java objects related to configuration, including their 
getter and setter
+ * methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+ * method will be removed entirely. It is recommended to find which 
state backend is used by
+ * state backend ConfigOption. For more details on using ConfigOption 
for state backend
+ * configuration, please refer to the Flink documentation: https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/state_backends;>state-backends
  * @see #setStateBackend(StateBackend)
  */
+@Deprecated
 @PublicEvolving
 public StateBackend getStateBackend() {
 return defaultStateBackend;

Review Comment:
   done



##
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java:
##
@@ -82,19 +83,26 @@ public static void main(String[] args) throws Exception {
 final ParameterTool params = ParameterTool.fromArgs(args);
 
 // create the environment to create streams and configure execution
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+Configuration configuration = new Configuration();
+final StreamExecutionEnvironment env =

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33581][core] Deprecate configuration getters/setters that return/set complex Java objects. [flink]

2023-11-23 Thread via GitHub


JunRuiLee commented on PR #23758:
URL: https://github.com/apache/flink/pull/23758#issuecomment-1825087740

   Hi @zhuzhurk, Thanks for your review. I've updated this PR, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][doc] add repo links of externalized connectors [flink]

2023-11-23 Thread via GitHub


JingGe opened a new pull request, #23787:
URL: https://github.com/apache/flink/pull/23787

   ## What is the purpose of the change
   
   Add the externalized connector repos into the doc to ease developer find the 
related connectors.
   
   ## Verifying this change
   
   This change is a trivial doc update.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33635:
---
Component/s: Connectors / Common
 (was: Connectors / ElasticSearch)

> Some connector can not compile in 1.19-SNAPSHOT
> ---
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33635:
---
Summary: Some connector can not compile in 1.19-SNAPSHOT  (was: Connector 
sink can not compile)

> Some connector can not compile in 1.19-SNAPSHOT
> ---
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT

2023-11-23 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789299#comment-17789299
 ] 

Jiabao Sun commented on FLINK-33635:


It seems caused by the refactor of Sink.InitContext. 
https://github.com/apache/flink/pull/23555

I meet the same problem in mongodb connector. 



> Some connector can not compile in 1.19-SNAPSHOT
> ---
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33635) Connector sink can not compile

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33635:
---
Summary: Connector sink can not compile  (was: ElasticSearch connector main 
branch can not compile)

> Connector sink can not compile
> --
>
> Key: FLINK-33635
> URL: https://issues.apache.org/jira/browse/FLINK-33635
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>
> https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33635) ElasticSearch connector main branch can not compile

2023-11-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-33635:
--

 Summary: ElasticSearch connector main branch can not compile
 Key: FLINK-33635
 URL: https://issues.apache.org/jira/browse/FLINK-33635
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Reporter: Weijie Guo
Assignee: Weijie Guo


https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33184) HybridShuffleITCase fails with exception in resource cleanup of task Map on AZP

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-33184.
--
Resolution: Fixed

> HybridShuffleITCase fails with exception in resource cleanup of task Map on 
> AZP
> ---
>
> Key: FLINK-33184
> URL: https://issues.apache.org/jira/browse/FLINK-33184
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> This build fails 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53548=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8710
> {noformat} 
> Map (5/10)#0] ERROR org.apache.flink.runtime.taskmanager.Task 
>[] - FATAL - exception in resource cleanup of task Map (5/10)#0 
> (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
> .
> java.lang.IllegalStateException: Leaking buffers.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) 
> ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) 
> [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
> 01:17:22,375 [flink-pekko.actor.default-dispatcher-5] INFO  
> org.apache.flink.runtime.taskmanager.Task[] - Task Sink: 
> Unnamed (3/10)#0 is already in state CANCELING
> 01:17:22,375 [Map (5/10)#0] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - FATAL - 
> exception in resource cleanup of task Map (5/10)#0 
> (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
> .
> java.lang.IllegalStateException: Leaking buffers.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284)

[jira] [Commented] (FLINK-33184) HybridShuffleITCase fails with exception in resource cleanup of task Map on AZP

2023-11-23 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789293#comment-17789293
 ] 

Weijie Guo commented on FLINK-33184:


It sees that this code path(i.e. IllegalStateException: Leaking buffers) has 
been fixed via FLINK-33185. Feel free to open a new ticket if something related 
to this test still exists.

> HybridShuffleITCase fails with exception in resource cleanup of task Map on 
> AZP
> ---
>
> Key: FLINK-33184
> URL: https://issues.apache.org/jira/browse/FLINK-33184
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> This build fails 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53548=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8710
> {noformat} 
> Map (5/10)#0] ERROR org.apache.flink.runtime.taskmanager.Task 
>[] - FATAL - exception in resource cleanup of task Map (5/10)#0 
> (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
> .
> java.lang.IllegalStateException: Leaking buffers.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) 
> ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) 
> [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
> 01:17:22,375 [flink-pekko.actor.default-dispatcher-5] INFO  
> org.apache.flink.runtime.taskmanager.Task[] - Task Sink: 
> Unnamed (3/10)#0 is already in state CANCELING
> 01:17:22,375 [Map (5/10)#0] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - FATAL - 
> exception in resource cleanup of task Map (5/10)#0 
> (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
> .
> java.lang.IllegalStateException: Leaking buffers.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> 

[jira] [Closed] (FLINK-33370) Simplify validateAndParseHostsString in Elasticsearch connecter's configuration

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-33370.
--
Fix Version/s: elasticsearch-3.1.0
   Resolution: Fixed

main via b15b6f01d0eb46527cbdf2e41a16db381529f3aa.

> Simplify validateAndParseHostsString in Elasticsearch connecter's 
> configuration
> ---
>
> Key: FLINK-33370
> URL: https://issues.apache.org/jira/browse/FLINK-33370
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: elasticsearch-3.1.0
>
>
> Currently, the validateAndParseHostsString method exists in each 
> configuration file(repeated for 3 times), but the method logic is exactly the 
> same. We can simplify it  by introducing a common util.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33370][connectors/elasticsearch] Simplify validateAndParseHostsString in Elasticsearch connector's configuration [flink-connector-elasticsearch]

2023-11-23 Thread via GitHub


reswqa merged PR #77:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/77


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33502) HybridShuffleITCase caused a fatal error

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-33502.
--
  Assignee: Wencong Liu
Resolution: Fixed

master(1.19) via 07cc8c5c7f276261b5d7c22e0b6617b6ba9666be.

> HybridShuffleITCase caused a fatal error
> 
>
> Key: FLINK-33502
> URL: https://issues.apache.org/jira/browse/FLINK-33502
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
> Attachments: image-2023-11-20-14-37-37-321.png
>
>
> [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177]
> {code:java}
> Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check 
> output in log
> 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9170Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9171Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd 
> /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC 
> -Xms256m -XX:+IgnoreUnrecognizedVMOptions 
> --add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar 
> /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar
>  /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 
> surefire6242806641230738408tmp surefire_1603959900047297795160tmp
> 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, 
> check output in log
> 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9176Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9177Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532)
> 9178Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479)
> 9179Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322)
> 9180Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33502) HybridShuffleITCase caused a fatal error

2023-11-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33502:
---
Fix Version/s: 1.19.0

> HybridShuffleITCase caused a fatal error
> 
>
> Key: FLINK-33502
> URL: https://issues.apache.org/jira/browse/FLINK-33502
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
> Attachments: image-2023-11-20-14-37-37-321.png
>
>
> [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177]
> {code:java}
> Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check 
> output in log
> 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9170Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9171Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd 
> /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC 
> -Xms256m -XX:+IgnoreUnrecognizedVMOptions 
> --add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar 
> /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar
>  /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 
> surefire6242806641230738408tmp surefire_1603959900047297795160tmp
> 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, 
> check output in log
> 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9176Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9177Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532)
> 9178Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479)
> 9179Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322)
> 9180Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33502][network] Ignore RejectedExecutionException in DiskIOScheduler when the batch read executor service shuts down [flink]

2023-11-23 Thread via GitHub


reswqa merged PR #23781:
URL: https://github.com/apache/flink/pull/23781


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]

2023-11-23 Thread via GitHub


Myasuka merged PR #23764:
URL: https://github.com/apache/flink/pull/23764


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33583][table] support state ttl hint for join [flink]

2023-11-23 Thread via GitHub


hackergin commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1403827616


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java:
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.hints.stream;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.utils.PlanKind;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import scala.Enumeration;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link org.apache.flink.table.planner.hint.StateTtlHint}. */
+class StateTtlHintTest extends TableTestBase {

Review Comment:
   Can we add a multi-level join test case here ?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][table][test] remove duplicated code [flink]

2023-11-23 Thread via GitHub


flinkbot commented on PR #23786:
URL: https://github.com/apache/flink/pull/23786#issuecomment-1825056837

   
   ## CI report:
   
   * 0c724f5597a1654d83682d5c39c7ee0ef2959cca UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][table][test] remove duplicated code [flink]

2023-11-23 Thread via GitHub


JingGe opened a new pull request, #23786:
URL: https://github.com/apache/flink/pull/23786

   ## What is the purpose of the change
   
   remove duplicated code according the clean code concept.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33616) multi lookup join error

2023-11-23 Thread yong yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yong yang closed FLINK-33616.
-
Resolution: Done

> multi lookup join error
> ---
>
> Key: FLINK-33616
> URL: https://issues.apache.org/jira/browse/FLINK-33616
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.17.1
>Reporter: yong yang
>Priority: Major
>
> stream1 lookup join jdbc1 on ... lookup join jdbc2 on jdbc1.intfield1 = 
> cast(jdbc2.stringfield2 as int)
> show error: Temporal table join requires an equality condition on fields of 
> table [default_catalog.default_database.t22].
>  
> test code:
>  
> {code:java}
> //代码占位符
> package com.yy.flinkSqlJoin
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.table.api.Expressions.row
> import org.apache.flink.table.api.{DataTypes, Table}
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
> import org.apache.flink.types.Row
> import java.time.ZoneId;
> /**
> +I  插入
> -U  更新前
> +U  更新后
> -D  撤回消息  会往kafka发一条null 对应mysql删除一条消息.
>  * https://www.yuque.com/u430335/qea2i2/kw4qqu
>  * 因为inner/left join不会发出回撤流 都是append 所以sink只需要支持append语义即可.
>  * 要求事实表维度表关联键key1 必须在维度表的DDL中指定为主键 primary key (key1)
>  * 测试使用:
>  *kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic 
> user_order
>  *kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic 
> user_payment
>  *kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic out
>  * kafka数据:
>  *订单:
>  *  {"order_id":100,"ts":166536720}  -- step2
>  *  {"order_id":101,"ts":166536720}  -- step6
>  *支付(mysql):
>  *  use db_yy;
> create table user_pay (
> order_id bigint
>   ,paymoney bigint
>   ,primary key (order_id)
>   )ENGINE=InnoDB DEFAULT CHARSET=utf8;
> insert into user_pay values(100,111); -- step1
> update user_pay set paymoney=222 where order_id=100; -- step3
> insert into user_pay values(101,33);  -- step4
> update user_pay set paymoney=44 where order_id=101; -- step5
>  *代码回撤流输出(只有insert):
>  *8> (true,+I[100, 2022-10-10T02:00:00Z, 111]) -- step2 之后.  注意: 
> lookup join是事实表为准,匹配维度表最新的数据. 没有也输出,维度表如果更新了,不会发回撤流更新结果
>  *(true,+I[101, 2022-10-10T02:00:00Z, 44]) -- step6 之后.
>  *kafka topic输出:
>  *{"order_id":100,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":111}
>  *{"order_id":101,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":44}
>  *
>  *逻辑:
>  *lookup join 也分为 inner join; left join; full join.
>  *lookup join是取事实表匹配维度表时的最新的数据. 要求维度表的join字段是外部connector的主键(kafka不行).
>  *
>  */
> object LookUpJoinJDBCDemo {
>   def main(args: Array[String]): Unit = {
> //Class.forName("com.mysql.cj.jdbc.Driver")
> // flink1.13 流处理环境初始化
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> // 指定国内时区
> tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
> // 订单表
> /*
> kafka参数:
>  d_timestamp 从kafka元数据或者原始数据中获取
> d_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
>  参数:json.fail-on-missing-field  
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/json/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba%e4%b8%80%e5%bc%a0%e5%9f%ba%e4%ba%8e-json-format-%e7%9a%84%e8%a1%a8
> 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
>  参数: json.ignore-parse-errors
> 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
> false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
>  注意: 下面 with中的配置是kafka输入表的配置
>  */
> //val UserOrderTableSql =
> //  """
> //|create table user_order (
> //| order_id bigint,
> //| ts bigint,
> //| d_timestamp as TO_TIMESTAMP_LTZ(ts,3),
> //| proctime AS PROCTIME() -- 事实表需要处理时间,维度表不需要
> //|)WITH(
> //|'connector' = 'kafka',
> //|  'topic' = 'user_order',
> //|  'properties.bootstrap.servers' = 'localhost:9092',
> //|  'properties.group.id' = 'g1',
> //|  'scan.startup.mode' = 'latest-offset',
> //|  'format' = 'json',
> //|  'json.fail-on-missing-field' = 'false', -- 解析字段缺失 是跳过还是报错.
> //|  'json.ignore-parse-errors' = 'true' -- 跳过解析异常的数据
> //|)
> //|""".stripMargin
> //tEnv.executeSql(UserOrderTableSql)
> // scala int 到 java Integer的隐式转换
> /*
>  case class C1(age:Int,name:String,time:Long)
>  flink stream 事件时间
>  */
> val table = tEnv.fromValues(
>   DataTypes.ROW(
> DataTypes.FIELD("order_id", DataTypes.STRING())
> , DataTypes.FIELD("ts", 

Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]

2023-11-23 Thread via GitHub


snuyanzin commented on PR #23757:
URL: https://github.com/apache/flink/pull/23757#issuecomment-1825039210

   Thanks for driving this :+1: 
   
   After looking at FLINK-28203 i noticed that there are also files
   tools/ci/compile.sh
   tools/ci/maven-utils.sh
   tools/ci/verify_bundled_optional.sh
   
   shouldn't they also be part of this PR?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]

2023-11-23 Thread via GitHub


snuyanzin commented on code in PR #23757:
URL: https://github.com/apache/flink/pull/23757#discussion_r1403810179


##
flink-table/flink-sql-gateway/pom.xml:
##
@@ -47,6 +47,7 @@
 org.apache.flink
 flink-sql-gateway-api
 ${project.version}
+   ${flink.markBundledAsOptional}

Review Comment:
   nit: 
   seems alignment should be fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]

2023-11-23 Thread via GitHub


snuyanzin commented on code in PR #23757:
URL: https://github.com/apache/flink/pull/23757#discussion_r1403809755


##
flink-dist/pom.xml:
##
@@ -480,6 +508,12 @@ under the License.
${project.version}
provided

+   
+   org.objenesis
+   objenesis
+   2.1

Review Comment:
   nit: do we need a version here if it is already in parent pom.xml?
   
   
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]

2023-11-23 Thread via GitHub


snuyanzin commented on code in PR #23757:
URL: https://github.com/apache/flink/pull/23757#discussion_r1403809450


##
flink-dist/pom.xml:
##
@@ -454,6 +470,18 @@ under the License.

provided

+   
+   org.slf4j
+   slf4j-api
+   ${slf4j.version}
+   ${flink.markBundledAsOptional}
+   
+   
+   com.google.code.findbugs
+   jsr305
+   1.3.9

Review Comment:
   nit: do we need a version here if it is already in parent `pom.xml`?
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount

2023-11-23 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-33485.
-
Fix Version/s: 1.19.0
   Resolution: Fixed

> Optimize the EXISTS sub-query by Metadata RowCount
> --
>
> Key: FLINK-33485
> URL: https://issues.apache.org/jira/browse/FLINK-33485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> If the sub-query is guaranteed to produce at least one row, just return TRUE. 
> If the sub-query is guaranteed to produce no row, just return FALSE.
> inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} 
> then it shold be adopted accordingly
> examples
> {code:sql}
> SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2)
> {code}
> aggregation functions always return 1 row even if there is an empty table 
> then we could just replace this query with 
> {code:sql}
> SELECT * FROM T2 
> {code}
> another example
> {code:sql}
> SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)
> {code}
> {{LIMIT 0}} means no rows so it cold be optimized to
> {code:sql}
> SELECT * FROM MyTable
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount

2023-11-23 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin closed FLINK-33485.
---

> Optimize the EXISTS sub-query by Metadata RowCount
> --
>
> Key: FLINK-33485
> URL: https://issues.apache.org/jira/browse/FLINK-33485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> If the sub-query is guaranteed to produce at least one row, just return TRUE. 
> If the sub-query is guaranteed to produce no row, just return FALSE.
> inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} 
> then it shold be adopted accordingly
> examples
> {code:sql}
> SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2)
> {code}
> aggregation functions always return 1 row even if there is an empty table 
> then we could just replace this query with 
> {code:sql}
> SELECT * FROM T2 
> {code}
> another example
> {code:sql}
> SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)
> {code}
> {{LIMIT 0}} means no rows so it cold be optimized to
> {code:sql}
> SELECT * FROM MyTable
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount

2023-11-23 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789280#comment-17789280
 ] 

Sergey Nuyanzin commented on FLINK-33485:
-

Merged to master as 
[a1d4dc0a4b05164aee41c781e75c9e9aeae5d758|https://github.com/apache/flink/commit/a1d4dc0a4b05164aee41c781e75c9e9aeae5d758]

> Optimize the EXISTS sub-query by Metadata RowCount
> --
>
> Key: FLINK-33485
> URL: https://issues.apache.org/jira/browse/FLINK-33485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> If the sub-query is guaranteed to produce at least one row, just return TRUE. 
> If the sub-query is guaranteed to produce no row, just return FALSE.
> inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} 
> then it shold be adopted accordingly
> examples
> {code:sql}
> SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2)
> {code}
> aggregation functions always return 1 row even if there is an empty table 
> then we could just replace this query with 
> {code:sql}
> SELECT * FROM T2 
> {code}
> another example
> {code:sql}
> SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)
> {code}
> {{LIMIT 0}} means no rows so it cold be optimized to
> {code:sql}
> SELECT * FROM MyTable
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]

2023-11-23 Thread via GitHub


snuyanzin merged PR #23685:
URL: https://github.com/apache/flink/pull/23685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]

2023-11-23 Thread via GitHub


snuyanzin commented on PR #23685:
URL: https://github.com/apache/flink/pull/23685#issuecomment-1825029898

   Thanks for taking a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-31958) Table to DataStream allow partial fields

2023-11-23 Thread padavan (Jira)


[ https://issues.apache.org/jira/browse/FLINK-31958 ]


padavan deleted comment on FLINK-31958:
-

was (Author: JIRAUSER287909):
up

> Table to DataStream allow partial fields
> 
>
> Key: FLINK-31958
> URL: https://issues.apache.org/jira/browse/FLINK-31958
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Table SQL / API
>Reporter: padavan
>Priority: Major
>
> Hello i have a Model with many many fields, example:
> {code:java}
> public class UserModel { public int userId; public int count; public int zip; 
> public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime 
> wEnd; }{code}
> I work with Table API, select fields and convert Table to DataStream by 
> Model. But problem what *i should select all fields if I don't even need it* 
> or i will get exception
> {quote}Column types of query result and sink for do not match. Cause: 
> Different number of columns.
> {quote}
> And I just have to substitute fake data for the plugs...
>  
> I want simple use with only fields wich i have selected like:
> {code:java}
> .select($("userId"), $("count").sum().as("count"));
> DataStream dataStream = te.toDataStream(win, 
> UserModel.class);{code}
>  
> *Excepted:* 
> Remove rule valdiation "Different number of columns.". If a column is not 
> selected it is initialized by default(T) / Null



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31958) Table to DataStream allow partial fields

2023-11-23 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789239#comment-17789239
 ] 

padavan commented on FLINK-31958:
-

up

> Table to DataStream allow partial fields
> 
>
> Key: FLINK-31958
> URL: https://issues.apache.org/jira/browse/FLINK-31958
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Table SQL / API
>Reporter: padavan
>Priority: Major
>
> Hello i have a Model with many many fields, example:
> {code:java}
> public class UserModel { public int userId; public int count; public int zip; 
> public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime 
> wEnd; }{code}
> I work with Table API, select fields and convert Table to DataStream by 
> Model. But problem what *i should select all fields if I don't even need it* 
> or i will get exception
> {quote}Column types of query result and sink for do not match. Cause: 
> Different number of columns.
> {quote}
> And I just have to substitute fake data for the plugs...
>  
> I want simple use with only fields wich i have selected like:
> {code:java}
> .select($("userId"), $("count").sum().as("count"));
> DataStream dataStream = te.toDataStream(win, 
> UserModel.class);{code}
>  
> *Excepted:* 
> Remove rule valdiation "Different number of columns.". If a column is not 
> selected it is initialized by default(T) / Null



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403659023


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess it could be argued that we can always send statistics about previous 
rescalings as metrics, but why do we then keep vertex-based scalingHistory?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess it could be argued that we can always send statistics about previous 
rescalings as metrics, but why do we then keep the vertex-based scalingHistory?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403659023


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess it could be argued that we can always send previous stats as 
metrics, but why do we then keep vertex-based scalingHistory?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403656624


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   General question - we are missing the job-based data structure that keeps 
track of the past rescaling details. Should be need to add something in the 
future, with the current structure it is as simple as adding data fields to the 
ScalingRecord. I am OK with removing the map, but the question is - are we sure 
we won't require something similar in the future anyways?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25421] Add JdbcSink with new format [flink-connector-jdbc]

2023-11-23 Thread via GitHub


eskabetxe commented on PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1824762588

   @wanglijie95 , @snuyanzin could you check please..
   
   The sink in this moment allows non-xa (at least once) and xa (exactly once) 
connections
   
   I need to add documentation but we could start checking the code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403619658


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   `estimatedEMA = estimatedEMA * x + newMeasurmenet * (1-x)`
   we could start with `x=0.5` which is pretty aggressive smoothing but should 
be fine give we don't have many scalings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403615188


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   EMA requires to know how many previous records in the window are taken into 
account because this determines the weight coefficient of the new record 
(smoothing factor). The length of the "window" of observation is also supposed 
to be fixed and not span all time from the beginning, so I am not sure we are 
talking about the classic definition of EMA. Maybe you could sketch the 
calculation you have in mind?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403615188


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   EMA requires to know how many previous records in the window are taken into 
account because this determines the weight coefficient of the new record 
(smoothing factor). The length of the "window" of observation is also supposed 
to be fixed and not span all time from the beginning, so I am not sure we are 
talking about the classic definition of EMA. Maybe you could sketch a 
calculation you have in mind?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403612096


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I saw this but this doesn't mention anything about history etc and refers to 
an offline discussion :) 
   Combined with the other comment related to the trimming issue (losing the 
restart info after 24h) I think the exponential moving avg is a simpler and 
slightly more robust initial approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403603955


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess 
[this](https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1815381974)
 got buried in the notifications:
   ```
   @gyfora me and Max briefly discussed offline and came to the conclusion that 
starting with
evaluating the maximum restart time capped by the RESTART_TIME setting is 
probably 
   good enough for the first step. It has the benefit of giving the most 
"conservative" 
   evaluation and we can add the moving average after some baseline testing. 
What do you think?
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]

2023-11-23 Thread via GitHub


afedulov commented on PR #23777:
URL: https://github.com/apache/flink/pull/23777#issuecomment-1824713681

   > We could then keep all the existing code and just wrap it.
   
   The execution model is fundamentally different - a while loop that runs 
directly in the function vs an async loop that runs one level "above" and 
synchronizes via completable futures instead of exposing an explicit lock. I do 
not think it is possible to create such an adapter because we do not have 
access to that higher-level loop. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #23777:
URL: https://github.com/apache/flink/pull/23777#discussion_r1403579962


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/SourceReaderWithCheckpointsLatch.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that synchronizes emission of N elements on the 
arrival of the checkpoint
+ * barriers It 1) emits a list of elements without checkpoints in-between, 2) 
then waits for two
+ * checkpoints to complete, 3) then re-emits the same elements before 4) 
waiting for another two
+ * checkpoints and 5) exiting.
+ *
+ * This lockstep execution is possible because {@code pollNext} and {@code 
snapshotState} are
+ * executed in the same thread and the fact that {@code pollNext} can emit N 
elements at once. This
+ * reader is meant to be used solely for testing purposes as the substitution 
for the {@code
+ * FiniteTestSource} which implements the deprecated {@code SourceFunction} 
API.
+ */
+@Experimental
+public class SourceReaderWithCheckpointsLatch<

Review Comment:
   My idea was at some point to pass the number of checkpoints from the 
outside, but sure, we can reflect this number in the name until then. Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]

2023-11-23 Thread via GitHub


afedulov commented on PR #23777:
URL: https://github.com/apache/flink/pull/23777#issuecomment-1824683968

   There are some things that can probably be done, but not in the scope of 
this PR. You can see how much complexity of the new Source API is already 
hidden behind the `IteratorSourceReaderBase`, the `DataGeneratorSource` and the 
`GeneratorFunction`. We are getting there, but in order to come up with 
meaningful abstractions we need to learn more about common patterns and we do 
so during migration of the existing SourceFunctions. I do not think there is an 
easy way to some up with something completely generic because of how the new 
Source imposes implementation of all of the low-level details with very 
specific classes (Splits, SplitEnumerators, serializers). See, for instance, 
what is going on in the `NumberSequenceSource` that we wrap around in the 
`DataGeneratorSource` that this implementation relies on:
   
https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java#L218


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2023-11-23 Thread Vladislav Keda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789203#comment-17789203
 ] 

Vladislav Keda commented on FLINK-32513:


Hi [~huwh]! I would like to ask if there are any news on this issue?

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Priority: Critical
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]

2023-11-23 Thread via GitHub


mxm commented on code in PR #23777:
URL: https://github.com/apache/flink/pull/23777#discussion_r1403558590


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/SourceReaderWithCheckpointsLatch.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that synchronizes emission of N elements on the 
arrival of the checkpoint
+ * barriers It 1) emits a list of elements without checkpoints in-between, 2) 
then waits for two
+ * checkpoints to complete, 3) then re-emits the same elements before 4) 
waiting for another two
+ * checkpoints and 5) exiting.
+ *
+ * This lockstep execution is possible because {@code pollNext} and {@code 
snapshotState} are
+ * executed in the same thread and the fact that {@code pollNext} can emit N 
elements at once. This
+ * reader is meant to be used solely for testing purposes as the substitution 
for the {@code
+ * FiniteTestSource} which implements the deprecated {@code SourceFunction} 
API.
+ */
+@Experimental
+public class SourceReaderWithCheckpointsLatch<

Review Comment:
   ```suggestion
   public class DoubleEmittingSourceReaderWithCheckpointsInBetween<
   ```
   
   Is this more like what it's doing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403543895


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403540366


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

[jira] [Commented] (FLINK-33634) Add Conditions to Flink CRD's Status field

2023-11-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789178#comment-17789178
 ] 

Gyula Fora commented on FLINK-33634:


Could you please elaborate a bit on what exactly do you suggest we put in the 
conditions field? 

> Add Conditions to Flink CRD's Status field
> --
>
> Key: FLINK-33634
> URL: https://issues.apache.org/jira/browse/FLINK-33634
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Major
>
> From 
> [https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties]
>  it is considered best practice to provide Conditions in the Status of CRD's. 
> Some tooling even expects there to be a Conditions field in the status of a 
> CR. This issue to to propose adding a Conditions field to the CR status
> e.g.
> status:
>     conditions:
>      - lastTransitionTime: '2023-11-23T12:38:51Z'
>        status: 'True'
>        type: Ready



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments

2023-11-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789176#comment-17789176
 ] 

Gyula Fora commented on FLINK-33633:


I am not completely sure about this feature. It's generally an anti pattern for 
operators to do such actions. Operators should run with minimal permissions and 
in most prod envs admins do not want the Flink operator to have access to 
creating services accounts, roles and role bindings

> Automatic creation of RBAC for instances of Flink Deployments
> -
>
> Key: FLINK-33633
> URL: https://issues.apache.org/jira/browse/FLINK-33633
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Not a Priority
>
> Currently users have to manually create RBAC e.g. the flink service account. 
> When operator is watching all namespaces; creation of a FlinkDeployment in a 
> specific namespace may fail if the kube admin has failed to create the 
> required RBAC. To improve usability the operator could be coded to 
> automatically create these rbac resources in the instance namespace if not 
> present



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403521447


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);

Review Comment:
   Should we maybe log this on debug? so we have an overview if we want to 
debug this?



##
docs/layouts/shortcodes/generated/auto_scaler_configuration.html:
##
@@ -80,6 +80,18 @@
 Duration
 Expected restart time to be used until the operator can 
determine it reliably from history.
 
+
+job.autoscaler.restart.time.tracked.enabled
+false
+Boolean
+Whether to use the actually observed rescaling restart times 
instead of the fixed 'job.autoscaler.restart.time' configuration. If set to 
true, the maximum restart duration over a number of samples will be used. The 
value of 'job.autoscaler.restart.time' will act as an upper bound.
+
+
+job.autoscaler.restart.time.tracked.limit

Review Comment:
   Sorry for the late comment, we could consider changing this to 
`job.autoscaler.restart.time-tracking.enabled` 

Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]

2023-11-23 Thread via GitHub


masteryhx commented on code in PR #23765:
URL: https://github.com/apache/flink/pull/23765#discussion_r1403408187


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java:
##
@@ -80,7 +86,8 @@ public RocksNativeFullSnapshotStrategy(
 @Nonnull LocalRecoveryConfig localRecoveryConfig,
 @Nonnull File instanceBasePath,
 @Nonnull UUID backendUID,
-@Nonnull RocksDBStateUploader rocksDBStateUploader) {
+@Nonnull RocksDBStateUploader rocksDBStateUploader,
+RocksDBStateFileVerifier stateFileVerifier) {

Review Comment:
   nit: Nullable Annotation



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java:
##
@@ -86,6 +86,18 @@ public class RocksDBOptions {
 .withDescription(
 "The number of threads (per stateful operator) 
used to transfer (download and upload) files in RocksDBStateBackend.");
 
+/**
+ * Whether to verify the Checksum of the incremental sst file during 
Checkpoint in
+ * RocksDBStateBackend.
+ */
+@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+public static final ConfigOption 
CHECKPOINT_VERIFY_CHECKSUM_ENABLE =
+
ConfigOptions.key("state.backend.rocksdb.checkpoint.verify.checksum.enable")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Whether to verify the Checksum of the incremental 
sst file during Checkpoint in RocksDBStateBackend");

Review Comment:
   Could we also add some important messages here:
   1. It may introduce some overhead for checkpoint procedure if enable.
   2. If checksum fail, we will fail the the checkpoint.



##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksdbStateFileVerifierTest.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.junit.Assert.fail;
+
+/** {@link RocksDBStateFileVerifier} test. */
+public class RocksdbStateFileVerifierTest {
+
+@Rule public TemporaryFolder folder = new TemporaryFolder();
+
+@Test
+public void rocksdbStateFileVerifierTest() throws Exception {
+ArrayList columnFamilyHandles = new ArrayList<>(1);
+String rootPath = folder.newFolder().getAbsolutePath();
+File dbPath = new File(rootPath, "db");
+File cpPath = new File(rootPath, "cp");
+
+try (DBOptions dbOptions = new DBOptions().setCreateIfMissing(true);
+ColumnFamilyOptions colOptions = new ColumnFamilyOptions();
+Options sstFileReaderOptions = new Options(dbOptions, 
colOptions);
+WriteOptions writeOptions = new 
WriteOptions().setDisableWAL(true);
+RocksDB db =
+RocksDB.open(
+dbOptions,
+dbPath.toString(),
+Collections.singletonList(
+new ColumnFamilyDescriptor(
+"default".getBytes(), 
colOptions)),
+

[jira] [Created] (FLINK-33634) Add Conditions to Flink CRD's Status field

2023-11-23 Thread Tony Garrard (Jira)
Tony Garrard created FLINK-33634:


 Summary: Add Conditions to Flink CRD's Status field
 Key: FLINK-33634
 URL: https://issues.apache.org/jira/browse/FLINK-33634
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Tony Garrard


>From 
>[https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties]
> it is considered best practice to provide Conditions in the Status of CRD's. 
>Some tooling even expects there to be a Conditions field in the status of a 
>CR. This issue to to propose adding a Conditions field to the CR status

e.g.
status:
    conditions:
     - lastTransitionTime: '2023-11-23T12:38:51Z'
       status: 'True'
       type: Ready



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][docs] Remove reference to obsolete Collector and refer to yield instead [flink]

2023-11-23 Thread via GitHub


afedulov commented on PR #23776:
URL: https://github.com/apache/flink/pull/23776#issuecomment-1824550622

   cc @alpinegizmo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments

2023-11-23 Thread Tony Garrard (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Garrard updated FLINK-33633:
-
Priority: Not a Priority  (was: Major)

> Automatic creation of RBAC for instances of Flink Deployments
> -
>
> Key: FLINK-33633
> URL: https://issues.apache.org/jira/browse/FLINK-33633
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Not a Priority
>
> Currently users have to manually create RBAC e.g. the flink service account. 
> When operator is watching all namespaces; creation of a FlinkDeployment in a 
> specific namespace may fail if the kube admin has failed to create the 
> required RBAC. To improve usability the operator could be coded to 
> automatically create these rbac resources in the instance namespace if not 
> present



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments

2023-11-23 Thread Tony Garrard (Jira)
Tony Garrard created FLINK-33633:


 Summary: Automatic creation of RBAC for instances of Flink 
Deployments
 Key: FLINK-33633
 URL: https://issues.apache.org/jira/browse/FLINK-33633
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Tony Garrard


Currently users have to manually create RBAC e.g. the flink service account. 
When operator is watching all namespaces; creation of a FlinkDeployment in a 
specific namespace may fail if the kube admin has failed to create the required 
RBAC. To improve usability the operator could be coded to automatically create 
these rbac resources in the instance namespace if not present



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33632) Add custom mutator plugin

2023-11-23 Thread Tony Garrard (Jira)
Tony Garrard created FLINK-33632:


 Summary: Add custom mutator plugin
 Key: FLINK-33632
 URL: https://issues.apache.org/jira/browse/FLINK-33632
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Tony Garrard


Currently users have the ability to provide custom validators to the operator. 
It would be great if we followed the same pattern to provide custom mutators



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33630) CoordinationResponse should be wrapped by SerializedValue in TaskOperatorEventGateway and JobMasterOperatorEventGateway

2023-11-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789146#comment-17789146
 ] 

Hang Ruan commented on FLINK-33630:
---

Hi, all. 

I would like to help to resolve this problem. Please assign this issue to me. 
Thanks.

> CoordinationResponse should be wrapped by SerializedValue in 
> TaskOperatorEventGateway and JobMasterOperatorEventGateway
> ---
>
> Key: FLINK-33630
> URL: https://issues.apache.org/jira/browse/FLINK-33630
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Qingsheng Ren
>Priority: Major
>
> FLINK-26077 introduced a two-way RPC between operator and coordinator, but 
> {{CoordinationResponse}} is not wrapped by {{{}SerializedValue{}}}:
>  
> [https://github.com/apache/flink/blob/c61c09e464073fae430cab2dd56bd608f9d275fd/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/a.java#L254-L255|https://github.com/apache/flink/blob/34620fc5c5698d00e64c6b15f8ce84f807a2e0d7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java#L54]
>  
> This might be a problem if the implementation of {{CoordinationResponse}} is 
> provided in user code and loaded by user code classloader, because Pekko RPC 
> handler always uses app classloader for serializing and deserializing RPC 
> parameters and return values, which will lead to {{ClassNotFoundException}} 
> in this case. Similar to what we do for the request, we need to wrap a 
> {{SerializesValue}} around the response to make sure RPC calls won't cause 
> {{{}ClassNotFoundException{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-11-23 Thread via GitHub


1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1402925601


##
docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##
@@ -442,4 +445,23 @@ migrate from the old abstractions. The steps to do this is 
as follows:
  `TypeSerializerConfigSnapshot` implementation as will as the
  `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the 
serializer).
 
+## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.18

Review Comment:
   ```suggestion
   ## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.19
   ```
   
   And this part has a series of 1.18



##
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##
@@ -345,12 +345,15 @@ public final class GenericArraySerializerSnapshot 
extends CompositeTypeSerial
 this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
 }
 
-@Override
-protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer 
newSerializer) {
-return (this.componentClass == newSerializer.getComponentClass())
-? OuterSchemaCompatibility.COMPATIBLE_AS_IS
-: OuterSchemaCompatibility.INCOMPATIBLE;
-}
+   @Override
+   protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
+   TypeSerializerSnapshot oldSerializerSnapshot) {
+   GenericArraySerializerSnapshot 
oldGenericArraySerializerSnapshot =
+   (GenericArraySerializerSnapshot) 
oldSerializerSnapshot;
+   return (this.componentClass == 
oldGenericArraySerializerSnapshot.componentClass)
+   ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+   : OuterSchemaCompatibility.INCOMPATIBLE;
+   }

Review Comment:
   Same comment: we should use 4 spaces instead of tab.



##
docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##
@@ -342,12 +342,15 @@ public final class GenericArraySerializerSnapshot 
extends CompositeTypeSerial
 this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
 }
 
-@Override
-protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer 
newSerializer) {
-return (this.componentClass == newSerializer.getComponentClass())
-? OuterSchemaCompatibility.COMPATIBLE_AS_IS
-: OuterSchemaCompatibility.INCOMPATIBLE;
-}
+   @Override
+   protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
+   TypeSerializerSnapshot oldSerializerSnapshot) {
+   GenericArraySerializerSnapshot 
oldGenericArraySerializerSnapshot =
+   (GenericArraySerializerSnapshot) 
oldSerializerSnapshot;
+   return (this.componentClass == 
oldGenericArraySerializerSnapshot.componentClass)
+   ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+   : OuterSchemaCompatibility.INCOMPATIBLE;
+   }

Review Comment:
   keep the code style are same. We should use 4 spaces instead of tab.



##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java:
##
@@ -34,21 +34,21 @@ public class CompositeTypeSerializerUtil {
  * can be used by legacy snapshot classes, which have a newer 
implementation implemented as a
  * {@link CompositeTypeSerializerSnapshot}.
  *
- * @param newSerializer the new serializer to check for compatibility.
+ * @param legacySerializerSnapshot the legacy serializer snapshot to check 
for compatibility.
  * @param newCompositeSnapshot an instance of the new snapshot class to 
delegate compatibility
  * checks to. This instance should already contain the outer snapshot 
information.
  * @param legacyNestedSnapshots the nested serializer snapshots of the 
legacy composite
  * snapshot.
  * @return the result compatibility.
  */
 public static  TypeSerializerSchemaCompatibility 
delegateCompatibilityCheckToNewSnapshot(
-TypeSerializer newSerializer,
-CompositeTypeSerializerSnapshot 
newCompositeSnapshot,
+TypeSerializerSnapshot legacySerializerSnapshot,

Review Comment:
   It use the `legacy` as the name prefix, and the caller uses `old` as the 
prefix.
   
   I see most of your changes use `old` or `previous` as the prefix, should we 
unify them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this 

[jira] [Commented] (FLINK-33565) The concurrentExceptions doesn't work

2023-11-23 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789137#comment-17789137
 ] 

Rui Fan commented on FLINK-33565:
-

Before creating this JIRA, I found that the concurrentExceptions doesn't work 
either multi-region or single region. So I totally agree with you.

 

> The concurrentExceptions doesn't work
> -
>
> Key: FLINK-33565
> URL: https://issues.apache.org/jira/browse/FLINK-33565
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> First of all, thanks to [~mapohl] for helping double-check in advance that 
> this was indeed a bug .
> Displaying exception history in WebUI is supported in FLINK-6042.
> h1. What's the concurrentExceptions?
> When an execution fails due to an exception, other executions in the same 
> region will also restart, and the first Exception is rootException. If other 
> restarted executions also report Exception at this time, we hope to collect 
> these exceptions and Displayed to the user as concurrentExceptions.
> h2. What's this bug?
> The concurrentExceptions is always empty in production, even if other 
> executions report exception at very close times.
> h1. Why doesn't it work?
> If one job has all-to-all shuffle, this job only has one region, and this 
> region has a lot of executions. If one execution throw exception:
>  * JobMaster will mark the state as FAILED for this execution.
>  * The rest of executions of this region will be marked to CANCELING.
>  ** This call stack can be found at FLIP-364 
> [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover]
>  
> When these executions throw exception as well, it JobMaster will mark the 
> state from CANCELING to CANCELED instead of FAILED.
> The CANCELED execution won't call FAILED logic, so their exceptions are 
> ignored.
> Note: all reports are executed inside of JobMaster RPC thread, it's single 
> thread. So these reports are executed serially. So only one execution is 
> marked to FAILED, and the rest of executions will be marked to CANCELED later.
> h1. How to fix it?
> Offline discuss with [~mapohl] , we need to discuss with community should we 
> keep the concurrentExceptions first.
>  * If no, we can remove related logic directly
>  * If yew, we discuss how to fix it later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >