[jira] [Comment Edited] (FLINK-25857) Add committer metrics to track the status of committables
[ https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789356#comment-17789356 ] Weijie Guo edited comment on FLINK-25857 at 11/24/23 7:53 AM: -- Thanks [~pvary] for the quick reply! Yes, In principle, it is indeed not required to bind the connector and flink versions. My main concern is that the changes seems without sufficient discussion (like drafting a FLIP, If there is indeed one, I'm sorry I missed it). In addition, {{FLIP-321: Introduce an API deprecation process}} seems to also have requirements for the deprecated process of `PublicEvolving` API. was (Author: weijie guo): Thanks [~pvary] for the quick reply! Yes, In principle, it is indeed not required to bind the connector and flink versions. My main concern is that the changes seems without sufficient discussion (like drafting a FLIP). In addition, {{FLIP-321: Introduce an API deprecation process}} seems to also have requirements for the deprecated process of `PublicEvolving` API. > Add committer metrics to track the status of committables > - > > Key: FLINK-25857 > URL: https://issues.apache.org/jira/browse/FLINK-25857 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Fabian Paul >Assignee: Peter Vary >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png > > > With Sink V2 we can now track the progress of a committable during committing > and show metrics about the committing status. (i.e. failed, retried, > succeeded). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25857) Add committer metrics to track the status of committables
[ https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789356#comment-17789356 ] Weijie Guo commented on FLINK-25857: Thanks [~pvary] for the quick reply! Yes, In principle, it is indeed not required to bind the connector and flink versions. My main concern is that the changes seems without sufficient discussion (like drafting a FLIP). In addition, {{FLIP-321: Introduce an API deprecation process}} seems to also have requirements for the deprecated process of `PublicEvolving` API. > Add committer metrics to track the status of committables > - > > Key: FLINK-25857 > URL: https://issues.apache.org/jira/browse/FLINK-25857 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Fabian Paul >Assignee: Peter Vary >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png > > > With Sink V2 we can now track the progress of a committable during committing > and show metrics about the committing status. (i.e. failed, retried, > succeeded). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789355#comment-17789355 ] Rui Fan commented on FLINK-27681: - {quote}In our production, the underlying environment may produce some errors, resulting in corrupted files. In addition, Flink stores local files in the form of a single copy. When a problematic file is uploaded to DFS as a checkpoint, this checkpoint will be unavailable. {quote} Hi [~Ming Li] [~mayuehappy] , I did a quick review for this PR. But I still don't know why the file is corrupted, would you mind describing it in detail? I'm worried that flink add check can't completely solve the problem of file corruption. Is it possible that file corruption occurs after flink check but before uploading the file to hdfs? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]
1996fanrui commented on code in PR #23765: URL: https://github.com/apache/flink/pull/23765#discussion_r1404014532 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java: ## @@ -86,6 +86,18 @@ public class RocksDBOptions { .withDescription( "The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend."); +/** + * Whether to verify the Checksum of the incremental sst file during Checkpoint in + * RocksDBStateBackend. + */ +@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) +public static final ConfigOption CHECKPOINT_VERIFY_CHECKSUM_ENABLE = + ConfigOptions.key("state.backend.rocksdb.checkpoint.verify.checksum.enable") Review Comment: ```suggestion ConfigOptions.key("state.backend.rocksdb.checkpoint.verify.checksum.enabled") ``` ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksdbStateFileVerifierTest.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.FileUtils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; Review Comment: The new test should use `import org.junit.jupiter.api.Test;` instead of `org.junit.Test`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33613][python] Make sure gRPC server is shutdown gracefully if Python process startup failed [flink]
flinkbot commented on PR #23789: URL: https://github.com/apache/flink/pull/23789#issuecomment-1825239372 ## CI report: * 30acb658f852496d73028b93467c024f3f4d43a0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-33613) Python UDF Runner process leak in Process Mode
[ https://issues.apache.org/jira/browse/FLINK-33613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-33613: --- Assignee: Dian Fu > Python UDF Runner process leak in Process Mode > -- > > Key: FLINK-33613 > URL: https://issues.apache.org/jira/browse/FLINK-33613 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.17.0 >Reporter: Yu Chen >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Attachments: ps-ef.txt, streaming_word_count-1.py > > > While working with PyFlink, we found that in Process Mode, the Python UDF > process may leak after a failover of the job. It leads to a rising number of > processes with their threads in the host machine, which eventually results in > failure to create new threads. > > You can try to reproduce it with the attached test task > `streamin_word_count.py`. > (Note that the job will continue failover, and you can watch the process > leaks by `ps -ef` on Taskmanager. > > Our test environment: > * K8S Application Mode > * 4 Taskmanagers with 12 slots/TM > * Job's parallelism was set to 48 > The udf process `pyflink.fn_execution.beam.beam_boot` should be consistence > with slots of TM (12), but we found that there are 180 processes on one > Taskmanager after several failovers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33613) Python UDF Runner process leak in Process Mode
[ https://issues.apache.org/jira/browse/FLINK-33613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33613: --- Labels: pull-request-available (was: ) > Python UDF Runner process leak in Process Mode > -- > > Key: FLINK-33613 > URL: https://issues.apache.org/jira/browse/FLINK-33613 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.17.0 >Reporter: Yu Chen >Priority: Major > Labels: pull-request-available > Attachments: ps-ef.txt, streaming_word_count-1.py > > > While working with PyFlink, we found that in Process Mode, the Python UDF > process may leak after a failover of the job. It leads to a rising number of > processes with their threads in the host machine, which eventually results in > failure to create new threads. > > You can try to reproduce it with the attached test task > `streamin_word_count.py`. > (Note that the job will continue failover, and you can watch the process > leaks by `ps -ef` on Taskmanager. > > Our test environment: > * K8S Application Mode > * 4 Taskmanagers with 12 slots/TM > * Job's parallelism was set to 48 > The udf process `pyflink.fn_execution.beam.beam_boot` should be consistence > with slots of TM (12), but we found that there are 180 processes on one > Taskmanager after several failovers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33613][python] Make sure gRPC server is shutdown gracefully if Python process startup failed [flink]
dianfu opened a new pull request, #23789: URL: https://github.com/apache/flink/pull/23789 ## What is the purpose of the change *This pull request fix the issue that the Java gRPC server was not shutdown properly during job cancelling which may cause the Python process not shutdown properly.* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DOC]Update watermark column type [flink]
flinkbot commented on PR #23788: URL: https://github.com/apache/flink/pull/23788#issuecomment-1825226910 ## CI report: * cfb92fb619cfdbaa4b610d4c95dae57922afec18 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31385) Introduce extended Assertj Matchers for completable futures
[ https://issues.apache.org/jira/browse/FLINK-31385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789347#comment-17789347 ] Yun Tang commented on FLINK-31385: -- Also picked in release-1.17 2a83c910eee711f8b5f9dd4697de60221f21fb9d for the pick of FLINK-33598. > Introduce extended Assertj Matchers for completable futures > --- > > Key: FLINK-31385 > URL: https://issues.apache.org/jira/browse/FLINK-31385 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: David Morávek >Assignee: David Morávek >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0, 1.17.3 > > > Introduce extended Assertj Matchers for completable futures that don't rely > on timeouts. > In general, we want to avoid relying on timeouts in the Flink test suite to > get additional context (thread dump) in case something gets stuck. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [DOC]Update watermark column type [flink]
hehuiyuan opened a new pull request, #23788: URL: https://github.com/apache/flink/pull/23788 ## What is the purpose of the change ![image](https://github.com/apache/flink/assets/18002496/9e1e478b-84ac-4930-9a42-6bb91616a3f2) https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/create/#watermark Add rowtime colunm type `TIMESTAMP_LTZ(3)` ## Brief change log Add rowtime colunm type `TIMESTAMP_LTZ(3)` ## Verifying this change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31385) Introduce extended Assertj Matchers for completable futures
[ https://issues.apache.org/jira/browse/FLINK-31385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-31385: - Fix Version/s: 1.17.3 > Introduce extended Assertj Matchers for completable futures > --- > > Key: FLINK-31385 > URL: https://issues.apache.org/jira/browse/FLINK-31385 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: David Morávek >Assignee: David Morávek >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0, 1.17.3 > > > Introduce extended Assertj Matchers for completable futures that don't rely > on timeouts. > In general, we want to avoid relying on timeouts in the Flink test suite to > get additional context (thread dump) in case something gets stuck. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33598) Watch HA configmap via name instead of lables to reduce pressure on APIserver
[ https://issues.apache.org/jira/browse/FLINK-33598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-33598. -- Resolution: Fixed Merged master: 608546e090f5d41c6a8b9af2c264467279181027 ... b7e8b792c086c3c445ee8429fbcfe035097a878c release-1.18: 6f30c6e427251dd4b2e4ad03f89bed06a519b05f release-1.17: 18d5a4696eccac3b5e7fe1d579547feef4537c08 > Watch HA configmap via name instead of lables to reduce pressure on APIserver > -- > > Key: FLINK-33598 > URL: https://issues.apache.org/jira/browse/FLINK-33598 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.18.0, 1.17.1 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1, 1.17.3 > > > As FLINK-24819 described, the k8s API server would receive more pressure when > HA is enabled, due to the configmap watching being achieved via filter with > labels instead of just querying the configmap name. This could be done after > FLINK-24038, which reduced the number of configmaps to only one as > {{-cluster-config-map}}. > This ticket would not touch {{--config-map}}, which stores > the checkpoint information, as that configmap is directly accessed by JM and > not watched by taskmanagers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33338) Bump FRocksDB version
[ https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789345#comment-17789345 ] Yue Ma commented on FLINK-8: [~pnowojski] Update: Now All the changes we need are released in {*}8.9.0(11/17/203) [HISTORY.md|{*}{*}https://github.com/facebook/rocksdb/blob/main/HISTORY.md{*}{*}]{*} > Bump FRocksDB version > - > > Key: FLINK-8 > URL: https://issues.apache.org/jira/browse/FLINK-8 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Piotr Nowojski >Priority: Major > > We need to bump RocksDB in order to be able to use new IngestDB and ClipDB > commands. > If some of the required changes haven't been merged to Facebook/RocksDB, we > should cherry-pick and include them in our FRocksDB fork. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33338) Bump FRocksDB version
[ https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789345#comment-17789345 ] Yue Ma edited comment on FLINK-8 at 11/24/23 6:49 AM: -- [~pnowojski] Update: Now All the changes we need are released in *8.9.0(11/17/203) [HISTORY.md|https://github.com/facebook/rocksdb/blob/main/HISTORY.md]* was (Author: mayuehappy): [~pnowojski] Update: Now All the changes we need are released in {*}8.9.0(11/17/203) [HISTORY.md|{*}{*}https://github.com/facebook/rocksdb/blob/main/HISTORY.md{*}{*}]{*} > Bump FRocksDB version > - > > Key: FLINK-8 > URL: https://issues.apache.org/jira/browse/FLINK-8 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Piotr Nowojski >Priority: Major > > We need to bump RocksDB in order to be able to use new IngestDB and ClipDB > commands. > If some of the required changes haven't been merged to Facebook/RocksDB, we > should cherry-pick and include them in our FRocksDB fork. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25857) Add committer metrics to track the status of committables
[ https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789335#comment-17789335 ] Peter Vary commented on FLINK-25857: For the Iceberg connector we release separate versions for different versions of Flink. If I understand correctly, this was the case before the separation of the connectors too - every Flink version contained different versions of connectors, and the jars might not work cross versions. Also the `TwoPhaseCommittingSink` is marked with `PublicEvolving` annotation, which means it could change between minor versions of Flink. We push forward these changes, so soon the API could be changed to `Public`, and we can avoid these kind of disruptions in the future. > Add committer metrics to track the status of committables > - > > Key: FLINK-25857 > URL: https://issues.apache.org/jira/browse/FLINK-25857 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Fabian Paul >Assignee: Peter Vary >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png > > > With Sink V2 we can now track the progress of a committable during committing > and show metrics about the committing status. (i.e. failed, retried, > succeeded). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement
[ https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789334#comment-17789334 ] Benchao Li commented on FLINK-33541: There is no such usage: "RAND(a, 10) and RAND(b, 10) ". The usages for RAND is "RAND() and RAND(seed)" > RAND_INTEGER can't be existed in a IF statement > > > Key: FLINK-33541 > URL: https://issues.apache.org/jira/browse/FLINK-33541 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.18.0 >Reporter: Guojun Li >Priority: Major > Labels: pull-request-available > Attachments: image-2023-11-24-13-31-21-209.png > > > The minimum produce steps: > Flink SQL> select if(1=1, rand_integer(100), 0); > [ERROR] Could not execute SQL statement. Reason: > java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL) > > But we do not see the exception reported in 1.14, not sure which version this > bug was introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement
[ https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789330#comment-17789330 ] xuyang commented on FLINK-33541: Hi, [~libenchao] , I checked the code again and sorry for this mistake. It forbids CAST from nullable to not null in `LogicalTypeCasts#defaultMethod` and I think this is reasonable. !image-2023-11-24-13-31-21-209.png|width=489,height=325! {quote}In this case, the argument is literal which is not null, so the result type is not null is ok. And I don't see many usages to pass in a nullable field as argument. {quote} For example, we have a table with column "a int not null" and "b int". I think the result type with RAND(a, 10) and RAND(b, 10) is different. The former is int not null and the latter is int. This difference will and should also affect the result type about its output. {quote} Isn't this is the root cause? {quote} The inconsistence about the behavior for result type during validation and code generation in RAND and RAND_INTEGER causes this bug. Look for your thoughts : ) > RAND_INTEGER can't be existed in a IF statement > > > Key: FLINK-33541 > URL: https://issues.apache.org/jira/browse/FLINK-33541 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.18.0 >Reporter: Guojun Li >Priority: Major > Labels: pull-request-available > Attachments: image-2023-11-24-13-31-21-209.png > > > The minimum produce steps: > Flink SQL> select if(1=1, rand_integer(100), 0); > [ERROR] Could not execute SQL statement. Reason: > java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL) > > But we do not see the exception reported in 1.14, not sure which version this > bug was introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement
[ https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-33541: --- Attachment: image-2023-11-24-13-31-21-209.png > RAND_INTEGER can't be existed in a IF statement > > > Key: FLINK-33541 > URL: https://issues.apache.org/jira/browse/FLINK-33541 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.18.0 >Reporter: Guojun Li >Priority: Major > Labels: pull-request-available > Attachments: image-2023-11-24-13-31-21-209.png > > > The minimum produce steps: > Flink SQL> select if(1=1, rand_integer(100), 0); > [ERROR] Could not execute SQL statement. Reason: > java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL) > > But we do not see the exception reported in 1.14, not sure which version this > bug was introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33565) The concurrentExceptions doesn't work
[ https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789329#comment-17789329 ] Rui Fan commented on FLINK-33565: - Give some background here: [FLIP-364|https://cwiki.apache.org/confluence/x/uJqzDw] is improving the restart-strategy. Currently, one pain point of restart-strategy is the restart-attempt is increased too fast (Read more details from FLIP-364 1.2 part). The core solution is similar and related to concurrentExceptions: it's that we merge some exceptions into one restart-attempt. [~zhuzh] and me would the restart-attempt is matched with the concurrentExceptions, when we merge some exceptions into one restart-attempt: we pick the first exception as the root cause, the rest of exceptions as the concurrent exceptions. [~mapohl] WDYT? > The concurrentExceptions doesn't work > - > > Key: FLINK-33565 > URL: https://issues.apache.org/jira/browse/FLINK-33565 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > First of all, thanks to [~mapohl] for helping double-check in advance that > this was indeed a bug . > Displaying exception history in WebUI is supported in FLINK-6042. > h1. What's the concurrentExceptions? > When an execution fails due to an exception, other executions in the same > region will also restart, and the first Exception is rootException. If other > restarted executions also report Exception at this time, we hope to collect > these exceptions and Displayed to the user as concurrentExceptions. > h2. What's this bug? > The concurrentExceptions is always empty in production, even if other > executions report exception at very close times. > h1. Why doesn't it work? > If one job has all-to-all shuffle, this job only has one region, and this > region has a lot of executions. If one execution throw exception: > * JobMaster will mark the state as FAILED for this execution. > * The rest of executions of this region will be marked to CANCELING. > ** This call stack can be found at FLIP-364 > [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover] > > When these executions throw exception as well, it JobMaster will mark the > state from CANCELING to CANCELED instead of FAILED. > The CANCELED execution won't call FAILED logic, so their exceptions are > ignored. > Note: all reports are executed inside of JobMaster RPC thread, it's single > thread. So these reports are executed serially. So only one execution is > marked to FAILED, and the rest of executions will be marked to CANCELED later. > h1. How to fix it? > Offline discuss with [~mapohl] , we need to discuss with community should we > keep the concurrentExceptions first. > * If no, we can remove related logic directly > * If yew, we discuss how to fix it later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33490) Validate the name conflicts when creating view
[ https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789326#comment-17789326 ] Benchao Li commented on FLINK-33490: I agree mostly, I don't have a strong opinion whether to have a FLIP, I'd like to hear more from others. > Validate the name conflicts when creating view > -- > > Key: FLINK-33490 > URL: https://issues.apache.org/jira/browse/FLINK-33490 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Shengkai Fang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > > We should forbid > ``` > CREATE VIEW id_view AS > SELECT id, uid AS id FROM id_table > ``` > As the SQL standards states, > If is specified, then: > i) If any two columns in the table specified by the have > equivalent s, or if any column of that table has an > implementation-dependent name, then a shall be specified. > ii) Equivalent s shall not be specified more than once in the > . > Many databases also throw exception when view name conflicts, e.g. mysql, > postgres. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33541) RAND_INTEGER can't be existed in a IF statement
[ https://issues.apache.org/jira/browse/FLINK-33541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789325#comment-17789325 ] Benchao Li commented on FLINK-33541: {quote}I notice currently the CAST rule is correct. It forbids casting from not null to nullable. The root cause is not IF but is RAND_INTEGER {quote} I don't agree on this, casting not null to nullable should be valid. The opposite should be invalid. {quote}Actually, the result type of RAND and RAND_INTEGER should depend on the types of arguments. {quote} In this case, the argument is literal which is not null, so the result type is not null is ok. And I don't see many usages to pass in a nullable field as argument. {quote}when codegen operator RAND, the return type of RAND(0) is always "nullable" in `RandCallGen`. {quote} Isn't this is the root cause? > RAND_INTEGER can't be existed in a IF statement > > > Key: FLINK-33541 > URL: https://issues.apache.org/jira/browse/FLINK-33541 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.0, 1.18.0 >Reporter: Guojun Li >Priority: Major > Labels: pull-request-available > > The minimum produce steps: > Flink SQL> select if(1=1, rand_integer(100), 0); > [ERROR] Could not execute SQL statement. Reason: > java.lang.Exception: Unsupported operand types: IF(boolean, INT, INT NOT NULL) > > But we do not see the exception reported in 1.14, not sure which version this > bug was introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33624) Bump Guava to 32.1.3-jre in flink-table
[ https://issues.apache.org/jira/browse/FLINK-33624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge resolved FLINK-33624. - Resolution: Fixed > Bump Guava to 32.1.3-jre in flink-table > --- > > Key: FLINK-33624 > URL: https://issues.apache.org/jira/browse/FLINK-33624 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33624) Bump Guava to 32.1.3-jre in flink-table
[ https://issues.apache.org/jira/browse/FLINK-33624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789324#comment-17789324 ] Jing Ge commented on FLINK-33624: - master: 34620fc5c5698d00e64c6b15f8ce84f807a2e0d7 > Bump Guava to 32.1.3-jre in flink-table > --- > > Key: FLINK-33624 > URL: https://issues.apache.org/jira/browse/FLINK-33624 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0
[ https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui closed FLINK-33547. --- Resolution: Duplicate Duplicated with https://issues.apache.org/jira/browse/FLINK-33523 > SQL primitive array type after upgrading to Flink 1.18.0 > > > Key: FLINK-33547 > URL: https://issues.apache.org/jira/browse/FLINK-33547 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Xingcan Cui >Priority: Major > > We have some Flink SQL UDFs that use object array (Object[]) arguments and > take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink > 1.18.0, the data created by ARRAY[] SQL function became primitive arrays > (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's > expected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0
[ https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789319#comment-17789319 ] Xingcan Cui commented on FLINK-33547: - Sure. I'll close this. > SQL primitive array type after upgrading to Flink 1.18.0 > > > Key: FLINK-33547 > URL: https://issues.apache.org/jira/browse/FLINK-33547 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Xingcan Cui >Priority: Major > > We have some Flink SQL UDFs that use object array (Object[]) arguments and > take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink > 1.18.0, the data created by ARRAY[] SQL function became primitive arrays > (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's > expected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0
[ https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789317#comment-17789317 ] Prabhu Joseph commented on FLINK-33547: --- [~xccui] Thanks for the details. This issue seems to be a duplicate of [FLINK-33523|https://issues.apache.org/jira/browse/FLINK-33523]. Shall we track this issue in FLINK-33523? Thanks. > SQL primitive array type after upgrading to Flink 1.18.0 > > > Key: FLINK-33547 > URL: https://issues.apache.org/jira/browse/FLINK-33547 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Xingcan Cui >Priority: Major > > We have some Flink SQL UDFs that use object array (Object[]) arguments and > take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink > 1.18.0, the data created by ARRAY[] SQL function became primitive arrays > (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's > expected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33523) DataType ARRAY fails to cast into Object[]
[ https://issues.apache.org/jira/browse/FLINK-33523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789316#comment-17789316 ] Prabhu Joseph commented on FLINK-33523: --- Thanks [~jeyhun]. Flink 1.17 does not have this enforcement. Flink 1.18 [FLINK-31835|https://issues.apache.org/jira/browse/FLINK-31835] has added the enforcement and has broken multiple places like [FLINK-33547 |https://issues.apache.org/jira/browse/FLINK-33547] and [Iceberg test cases|https://github.com/apache/iceberg/issues/8930]. > DataType ARRAY fails to cast into Object[] > > > Key: FLINK-33523 > URL: https://issues.apache.org/jira/browse/FLINK-33523 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.0 >Reporter: Prabhu Joseph >Priority: Major > > When upgrading Iceberg's Flink version to 1.18, we found the Flink-related > unit test case broken due to this issue. The below code used to work fine in > Flink 1.17 but failed after upgrading to 1.18. DataType ARRAY > fails to cast into Object[]. > *Error:* > {code} > Exception in thread "main" java.lang.ClassCastException: [I cannot be cast to > [Ljava.lang.Object; > at FlinkArrayIntNotNullTest.main(FlinkArrayIntNotNullTest.java:18) > {code} > *Repro:* > {code} > import org.apache.flink.table.data.ArrayData; > import org.apache.flink.table.data.GenericArrayData; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableEnvironment; > import org.apache.flink.table.api.TableResult; > public class FlinkArrayIntNotNullTest { > public static void main(String[] args) throws Exception { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > TableEnvironment env = TableEnvironment.create(settings); > env.executeSql("CREATE TABLE filesystemtable2 (id INT, data ARRAY NOT NULL>) WITH ('connector' = 'filesystem', 'path' = > '/tmp/FLINK/filesystemtable2', 'format'='json')"); > env.executeSql("INSERT INTO filesystemtable2 VALUES (4,ARRAY [1,2,3])"); > TableResult tableResult = env.executeSql("SELECT * from > filesystemtable2"); > ArrayData actualArrayData = new GenericArrayData((Object[]) > tableResult.collect().next().getField(1)); > } > } > {code} > *Analysis:* > 1. The code works fine with ARRAY datatype. The issue happens when using > ARRAY. > 2. The code works fine when casting into int[] instead of Object[]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33358][sql] Fix Flink SQL Client fail to start in Flink on YARN [flink]
PrabhuJoseph commented on PR #23629: URL: https://github.com/apache/flink/pull/23629#issuecomment-1825118817 @1996fanrui Could you help in committing this change. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33472) Solve the problem that the temporary file of flink-conf.yaml in S3AFileSystem cannot be uploaded
[ https://issues.apache.org/jira/browse/FLINK-33472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789000#comment-17789000 ] zhengzhili edited comment on FLINK-33472 at 11/24/23 3:40 AM: -- [~mapohl] this is my pull request [https://github.com/apache/flink/pull/23782] , could you please have a look? was (Author: JIRAUSER302860): https://github.com/apache/flink/pull/23782 > Solve the problem that the temporary file of flink-conf.yaml in S3AFileSystem > cannot be uploaded > > > Key: FLINK-33472 > URL: https://issues.apache.org/jira/browse/FLINK-33472 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.17.1 >Reporter: zhengzhili >Assignee: zhengzhili >Priority: Minor > Labels: pull-request-available > Attachments: image-2023-11-16-09-56-40-806.png > > > Solve the problem that the temporary file of flink-conf.yaml in S3AFileSystem > cannot be uploaded。 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33547) SQL primitive array type after upgrading to Flink 1.18.0
[ https://issues.apache.org/jira/browse/FLINK-33547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789311#comment-17789311 ] Xingcan Cui commented on FLINK-33547: - Hi [~jeyhun] , thanks for your attention! What you explained makes sense. However, sometimes it's tricky to deal with primitive array parameters. They make it harder for users to write generic UDFs, e.g., one that takes an arbitrary array and returns the first 3 elements. Also, this is a breaking change in 1.18.0. All the old UDFs using Object[] as arguments before can't directly work for primitive arrays generated from ARRAY functions now. Type inference and dealing with null values are challenging in Flink SQL. Users won't understand why a UDF accepting an ARRAY argument can't work for an ARRAY parameter. It's also impossible for UDF developers to code a bunch of functions taking different primitive arrays. We need some changes here. > SQL primitive array type after upgrading to Flink 1.18.0 > > > Key: FLINK-33547 > URL: https://issues.apache.org/jira/browse/FLINK-33547 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Xingcan Cui >Priority: Major > > We have some Flink SQL UDFs that use object array (Object[]) arguments and > take boxed arrays (e.g., Float[]) as parameters. After upgrading to Flink > 1.18.0, the data created by ARRAY[] SQL function became primitive arrays > (e.g., float[]) and it caused argument mismatch issues. I'm not sure if it's > expected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]
mayuehappy commented on PR #23765: URL: https://github.com/apache/flink/pull/23765#issuecomment-1825104163 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]
mayuehappy commented on PR #23765: URL: https://github.com/apache/flink/pull/23765#issuecomment-1825103757 @masteryhx Thanks for the code review , I have updated the MR , PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25857) Add committer metrics to track the status of committables
[ https://issues.apache.org/jira/browse/FLINK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789304#comment-17789304 ] Weijie Guo commented on FLINK-25857: IIUC, this will cause our external connector sink to be unable to support flink-1.19 and versions less than 1.19 at the same time. > Add committer metrics to track the status of committables > - > > Key: FLINK-25857 > URL: https://issues.apache.org/jira/browse/FLINK-25857 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Fabian Paul >Assignee: Peter Vary >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-10-20-17-23-09-595.png, screenshot-1.png > > > With Sink V2 we can now track the progress of a committable during committing > and show metrics about the committing status. (i.e. failed, retried, > succeeded). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33635: --- Priority: Blocker (was: Major) > Some connectors can not compile in 1.19-SNAPSHOT > > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Blocker > > The sink API compatibility was broken in FLINK-25857. > org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was > changed to > org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext). > All external connectors sink can not compile as this change. > For example: > es: > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 > aws: > https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33636) Implement JdbcAutoScalerEventHandler
Rui Fan created FLINK-33636: --- Summary: Implement JdbcAutoScalerEventHandler Key: FLINK-33636 URL: https://issues.apache.org/jira/browse/FLINK-33636 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789301#comment-17789301 ] Jiabao Sun commented on FLINK-33635: mongodb: https://github.com/apache/flink-connector-mongodb/actions/runs/6917152935/job/18818013536 > Some connectors can not compile in 1.19-SNAPSHOT > > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > The sink API compatibility was broken in FLINK-25857. > org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was > changed to > org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext). > All external connectors sink can not compile as this change. > For example: > es: > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 > aws: > https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][doc] add repo links of externalized connectors [flink]
flinkbot commented on PR #23787: URL: https://github.com/apache/flink/pull/23787#issuecomment-1825089599 ## CI report: * ebda3f5e2e64929e223823778ca6626435eed26b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789300#comment-17789300 ] Yuxin Tan commented on FLINK-33635: --- The failed reason is that the APIs are changed by [FLINK-25857|https://issues.apache.org/jira/browse/FLINK-25857]. This change may affect all connector's implementation. I think we should fix the issue with high priority. > Some connectors can not compile in 1.19-SNAPSHOT > > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > The sink API compatibility was broken in FLINK-25857. > org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was > changed to > org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext). > All external connectors sink can not compile as this change. > For example: > es: > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 > aws: > https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33635: --- Description: The sink API compatibility was broken in FLINK-25857. org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was changed to org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext). All external connectors sink can not compile as this change. For example: es: https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 aws: https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160 was:https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 > Some connector can not compile in 1.19-SNAPSHOT > --- > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > The sink API compatibility was broken in FLINK-25857. > org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was > changed to > org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext). > All external connectors sink can not compile as this change. > For example: > es: > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 > aws: > https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33635) Some connectors can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33635: --- Summary: Some connectors can not compile in 1.19-SNAPSHOT (was: Some connector can not compile in 1.19-SNAPSHOT) > Some connectors can not compile in 1.19-SNAPSHOT > > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > The sink API compatibility was broken in FLINK-25857. > org.apache.flink.api.connector.sink2.Sink#createWriter(InitContext) was > changed to > org.apache.flink.api.connector.sink2.Sink#createWriter(WriterInitContext). > All external connectors sink can not compile as this change. > For example: > es: > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 > aws: > https://github.com/apache/flink-connector-aws/actions/runs/6975253086/job/18982104160 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33581][core] Deprecate configuration getters/setters that return/set complex Java objects. [flink]
JunRuiLee commented on code in PR #23758: URL: https://github.com/apache/flink/pull/23758#discussion_r1403852951 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -676,8 +690,16 @@ public StreamExecutionEnvironment setStateBackend(StateBackend backend) { /** * Gets the state backend that defines how to store and checkpoint state. * + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to find which state backend is used by + * state backend ConfigOption. For more details on using ConfigOption for state backend + * configuration, please refer to the Flink documentation: https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/state_backends;>state-backends * @see #setStateBackend(StateBackend) */ +@Deprecated @PublicEvolving public StateBackend getStateBackend() { return defaultStateBackend; Review Comment: done ## flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java: ## @@ -82,19 +83,26 @@ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); // create the environment to create streams and configure execution -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Configuration configuration = new Configuration(); +final StreamExecutionEnvironment env = Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33581][core] Deprecate configuration getters/setters that return/set complex Java objects. [flink]
JunRuiLee commented on PR #23758: URL: https://github.com/apache/flink/pull/23758#issuecomment-1825087740 Hi @zhuzhurk, Thanks for your review. I've updated this PR, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][doc] add repo links of externalized connectors [flink]
JingGe opened a new pull request, #23787: URL: https://github.com/apache/flink/pull/23787 ## What is the purpose of the change Add the externalized connector repos into the doc to ease developer find the related connectors. ## Verifying this change This change is a trivial doc update. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33635: --- Component/s: Connectors / Common (was: Connectors / ElasticSearch) > Some connector can not compile in 1.19-SNAPSHOT > --- > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33635: --- Summary: Some connector can not compile in 1.19-SNAPSHOT (was: Connector sink can not compile) > Some connector can not compile in 1.19-SNAPSHOT > --- > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33635) Some connector can not compile in 1.19-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789299#comment-17789299 ] Jiabao Sun commented on FLINK-33635: It seems caused by the refactor of Sink.InitContext. https://github.com/apache/flink/pull/23555 I meet the same problem in mongodb connector. > Some connector can not compile in 1.19-SNAPSHOT > --- > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33635) Connector sink can not compile
[ https://issues.apache.org/jira/browse/FLINK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33635: --- Summary: Connector sink can not compile (was: ElasticSearch connector main branch can not compile) > Connector sink can not compile > -- > > Key: FLINK-33635 > URL: https://issues.apache.org/jira/browse/FLINK-33635 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > > https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33635) ElasticSearch connector main branch can not compile
Weijie Guo created FLINK-33635: -- Summary: ElasticSearch connector main branch can not compile Key: FLINK-33635 URL: https://issues.apache.org/jira/browse/FLINK-33635 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Reporter: Weijie Guo Assignee: Weijie Guo https://github.com/apache/flink-connector-elasticsearch/actions/runs/6976181890/job/18984287421 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33184) HybridShuffleITCase fails with exception in resource cleanup of task Map on AZP
[ https://issues.apache.org/jira/browse/FLINK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-33184. -- Resolution: Fixed > HybridShuffleITCase fails with exception in resource cleanup of task Map on > AZP > --- > > Key: FLINK-33184 > URL: https://issues.apache.org/jira/browse/FLINK-33184 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > > This build fails > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53548=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8710 > {noformat} > Map (5/10)#0] ERROR org.apache.flink.runtime.taskmanager.Task >[] - FATAL - exception in resource cleanup of task Map (5/10)#0 > (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0) > . > java.lang.IllegalStateException: Leaking buffers. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) > [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] > 01:17:22,375 [flink-pekko.actor.default-dispatcher-5] INFO > org.apache.flink.runtime.taskmanager.Task[] - Task Sink: > Unnamed (3/10)#0 is already in state CANCELING > 01:17:22,375 [Map (5/10)#0] ERROR > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - FATAL - > exception in resource cleanup of task Map (5/10)#0 > (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0) > . > java.lang.IllegalStateException: Leaking buffers. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284)
[jira] [Commented] (FLINK-33184) HybridShuffleITCase fails with exception in resource cleanup of task Map on AZP
[ https://issues.apache.org/jira/browse/FLINK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789293#comment-17789293 ] Weijie Guo commented on FLINK-33184: It sees that this code path(i.e. IllegalStateException: Leaking buffers) has been fixed via FLINK-33185. Feel free to open a new ticket if something related to this test still exists. > HybridShuffleITCase fails with exception in resource cleanup of task Map on > AZP > --- > > Key: FLINK-33184 > URL: https://issues.apache.org/jira/browse/FLINK-33184 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > > This build fails > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53548=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8710 > {noformat} > Map (5/10)#0] ERROR org.apache.flink.runtime.taskmanager.Task >[] - FATAL - exception in resource cleanup of task Map (5/10)#0 > (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0) > . > java.lang.IllegalStateException: Leaking buffers. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) > [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] > 01:17:22,375 [flink-pekko.actor.default-dispatcher-5] INFO > org.apache.flink.runtime.taskmanager.Task[] - Task Sink: > Unnamed (3/10)#0 is already in state CANCELING > 01:17:22,375 [Map (5/10)#0] ERROR > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - FATAL - > exception in resource cleanup of task Map (5/10)#0 > (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0) > . > java.lang.IllegalStateException: Leaking buffers. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at >
[jira] [Closed] (FLINK-33370) Simplify validateAndParseHostsString in Elasticsearch connecter's configuration
[ https://issues.apache.org/jira/browse/FLINK-33370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-33370. -- Fix Version/s: elasticsearch-3.1.0 Resolution: Fixed main via b15b6f01d0eb46527cbdf2e41a16db381529f3aa. > Simplify validateAndParseHostsString in Elasticsearch connecter's > configuration > --- > > Key: FLINK-33370 > URL: https://issues.apache.org/jira/browse/FLINK-33370 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Minor > Labels: pull-request-available > Fix For: elasticsearch-3.1.0 > > > Currently, the validateAndParseHostsString method exists in each > configuration file(repeated for 3 times), but the method logic is exactly the > same. We can simplify it by introducing a common util. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33370][connectors/elasticsearch] Simplify validateAndParseHostsString in Elasticsearch connector's configuration [flink-connector-elasticsearch]
reswqa merged PR #77: URL: https://github.com/apache/flink-connector-elasticsearch/pull/77 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33502) HybridShuffleITCase caused a fatal error
[ https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-33502. -- Assignee: Wencong Liu Resolution: Fixed master(1.19) via 07cc8c5c7f276261b5d7c22e0b6617b6ba9666be. > HybridShuffleITCase caused a fatal error > > > Key: FLINK-33502 > URL: https://issues.apache.org/jira/browse/FLINK-33502 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > Attachments: image-2023-11-20-14-37-37-321.png > > > [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177] > {code:java} > Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check > output in log > 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239 > 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests: > 9170Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.flink.test.runtime.HybridShuffleITCase > 9171Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd > /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC > -Xms256m -XX:+IgnoreUnrecognizedVMOptions > --add-opens=java.base/java.util=ALL-UNNAMED > --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar > /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar > /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 > surefire6242806641230738408tmp surefire_1603959900047297795160tmp > 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, > check output in log > 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239 > 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests: > 9176Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.flink.test.runtime.HybridShuffleITCase > 9177Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532) > 9178Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479) > 9179Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322) > 9180Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266) > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33502) HybridShuffleITCase caused a fatal error
[ https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33502: --- Fix Version/s: 1.19.0 > HybridShuffleITCase caused a fatal error > > > Key: FLINK-33502 > URL: https://issues.apache.org/jira/browse/FLINK-33502 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > Attachments: image-2023-11-20-14-37-37-321.png > > > [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177] > {code:java} > Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check > output in log > 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239 > 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests: > 9170Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.flink.test.runtime.HybridShuffleITCase > 9171Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd > /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC > -Xms256m -XX:+IgnoreUnrecognizedVMOptions > --add-opens=java.base/java.util=ALL-UNNAMED > --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar > /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar > /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 > surefire6242806641230738408tmp surefire_1603959900047297795160tmp > 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, > check output in log > 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239 > 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests: > 9176Error: 21:21:35 21:21:35.379 [ERROR] > org.apache.flink.test.runtime.HybridShuffleITCase > 9177Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532) > 9178Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479) > 9179Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322) > 9180Error: 21:21:35 21:21:35.379 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266) > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33502][network] Ignore RejectedExecutionException in DiskIOScheduler when the batch read executor service shuts down [flink]
reswqa merged PR #23781: URL: https://github.com/apache/flink/pull/23781 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka merged PR #23764: URL: https://github.com/apache/flink/pull/23764 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33583][table] support state ttl hint for join [flink]
hackergin commented on code in PR #23752: URL: https://github.com/apache/flink/pull/23752#discussion_r1403827616 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java: ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.hints.stream; + +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.utils.PlanKind; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import scala.Enumeration; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link org.apache.flink.table.planner.hint.StateTtlHint}. */ +class StateTtlHintTest extends TableTestBase { Review Comment: Can we add a multi-level join test case here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][table][test] remove duplicated code [flink]
flinkbot commented on PR #23786: URL: https://github.com/apache/flink/pull/23786#issuecomment-1825056837 ## CI report: * 0c724f5597a1654d83682d5c39c7ee0ef2959cca UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][table][test] remove duplicated code [flink]
JingGe opened a new pull request, #23786: URL: https://github.com/apache/flink/pull/23786 ## What is the purpose of the change remove duplicated code according the clean code concept. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33616) multi lookup join error
[ https://issues.apache.org/jira/browse/FLINK-33616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yong yang closed FLINK-33616. - Resolution: Done > multi lookup join error > --- > > Key: FLINK-33616 > URL: https://issues.apache.org/jira/browse/FLINK-33616 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.17.1 >Reporter: yong yang >Priority: Major > > stream1 lookup join jdbc1 on ... lookup join jdbc2 on jdbc1.intfield1 = > cast(jdbc2.stringfield2 as int) > show error: Temporal table join requires an equality condition on fields of > table [default_catalog.default_database.t22]. > > test code: > > {code:java} > //代码占位符 > package com.yy.flinkSqlJoin > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import org.apache.flink.table.api.Expressions.row > import org.apache.flink.table.api.{DataTypes, Table} > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment > import org.apache.flink.types.Row > import java.time.ZoneId; > /** > +I 插入 > -U 更新前 > +U 更新后 > -D 撤回消息 会往kafka发一条null 对应mysql删除一条消息. > * https://www.yuque.com/u430335/qea2i2/kw4qqu > * 因为inner/left join不会发出回撤流 都是append 所以sink只需要支持append语义即可. > * 要求事实表维度表关联键key1 必须在维度表的DDL中指定为主键 primary key (key1) > * 测试使用: > *kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic > user_order > *kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic > user_payment > *kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic out > * kafka数据: > *订单: > * {"order_id":100,"ts":166536720} -- step2 > * {"order_id":101,"ts":166536720} -- step6 > *支付(mysql): > * use db_yy; > create table user_pay ( > order_id bigint > ,paymoney bigint > ,primary key (order_id) > )ENGINE=InnoDB DEFAULT CHARSET=utf8; > insert into user_pay values(100,111); -- step1 > update user_pay set paymoney=222 where order_id=100; -- step3 > insert into user_pay values(101,33); -- step4 > update user_pay set paymoney=44 where order_id=101; -- step5 > *代码回撤流输出(只有insert): > *8> (true,+I[100, 2022-10-10T02:00:00Z, 111]) -- step2 之后. 注意: > lookup join是事实表为准,匹配维度表最新的数据. 没有也输出,维度表如果更新了,不会发回撤流更新结果 > *(true,+I[101, 2022-10-10T02:00:00Z, 44]) -- step6 之后. > *kafka topic输出: > *{"order_id":100,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":111} > *{"order_id":101,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":44} > * > *逻辑: > *lookup join 也分为 inner join; left join; full join. > *lookup join是取事实表匹配维度表时的最新的数据. 要求维度表的join字段是外部connector的主键(kafka不行). > * > */ > object LookUpJoinJDBCDemo { > def main(args: Array[String]): Unit = { > //Class.forName("com.mysql.cj.jdbc.Driver") > // flink1.13 流处理环境初始化 > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env) > // 指定国内时区 > tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) > // 订单表 > /* > kafka参数: > d_timestamp 从kafka元数据或者原始数据中获取 > d_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' > 参数:json.fail-on-missing-field > https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/json/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba%e4%b8%80%e5%bc%a0%e5%9f%ba%e4%ba%8e-json-format-%e7%9a%84%e8%a1%a8 > 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败) > 参数: json.ignore-parse-errors > 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 > false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null > 注意: 下面 with中的配置是kafka输入表的配置 > */ > //val UserOrderTableSql = > // """ > //|create table user_order ( > //| order_id bigint, > //| ts bigint, > //| d_timestamp as TO_TIMESTAMP_LTZ(ts,3), > //| proctime AS PROCTIME() -- 事实表需要处理时间,维度表不需要 > //|)WITH( > //|'connector' = 'kafka', > //| 'topic' = 'user_order', > //| 'properties.bootstrap.servers' = 'localhost:9092', > //| 'properties.group.id' = 'g1', > //| 'scan.startup.mode' = 'latest-offset', > //| 'format' = 'json', > //| 'json.fail-on-missing-field' = 'false', -- 解析字段缺失 是跳过还是报错. > //| 'json.ignore-parse-errors' = 'true' -- 跳过解析异常的数据 > //|) > //|""".stripMargin > //tEnv.executeSql(UserOrderTableSql) > // scala int 到 java Integer的隐式转换 > /* > case class C1(age:Int,name:String,time:Long) > flink stream 事件时间 > */ > val table = tEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("order_id", DataTypes.STRING()) > , DataTypes.FIELD("ts",
Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]
snuyanzin commented on PR #23757: URL: https://github.com/apache/flink/pull/23757#issuecomment-1825039210 Thanks for driving this :+1: After looking at FLINK-28203 i noticed that there are also files tools/ci/compile.sh tools/ci/maven-utils.sh tools/ci/verify_bundled_optional.sh shouldn't they also be part of this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]
snuyanzin commented on code in PR #23757: URL: https://github.com/apache/flink/pull/23757#discussion_r1403810179 ## flink-table/flink-sql-gateway/pom.xml: ## @@ -47,6 +47,7 @@ org.apache.flink flink-sql-gateway-api ${project.version} + ${flink.markBundledAsOptional} Review Comment: nit: seems alignment should be fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]
snuyanzin commented on code in PR #23757: URL: https://github.com/apache/flink/pull/23757#discussion_r1403809755 ## flink-dist/pom.xml: ## @@ -480,6 +508,12 @@ under the License. ${project.version} provided + + org.objenesis + objenesis + 2.1 Review Comment: nit: do we need a version here if it is already in parent pom.xml? ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-18356] Update CI image [flink]
snuyanzin commented on code in PR #23757: URL: https://github.com/apache/flink/pull/23757#discussion_r1403809450 ## flink-dist/pom.xml: ## @@ -454,6 +470,18 @@ under the License. provided + + org.slf4j + slf4j-api + ${slf4j.version} + ${flink.markBundledAsOptional} + + + com.google.code.findbugs + jsr305 + 1.3.9 Review Comment: nit: do we need a version here if it is already in parent `pom.xml`? ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount
[ https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-33485. - Fix Version/s: 1.19.0 Resolution: Fixed > Optimize the EXISTS sub-query by Metadata RowCount > -- > > Key: FLINK-33485 > URL: https://issues.apache.org/jira/browse/FLINK-33485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > If the sub-query is guaranteed to produce at least one row, just return TRUE. > If the sub-query is guaranteed to produce no row, just return FALSE. > inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} > then it shold be adopted accordingly > examples > {code:sql} > SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2) > {code} > aggregation functions always return 1 row even if there is an empty table > then we could just replace this query with > {code:sql} > SELECT * FROM T2 > {code} > another example > {code:sql} > SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0) > {code} > {{LIMIT 0}} means no rows so it cold be optimized to > {code:sql} > SELECT * FROM MyTable > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount
[ https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin closed FLINK-33485. --- > Optimize the EXISTS sub-query by Metadata RowCount > -- > > Key: FLINK-33485 > URL: https://issues.apache.org/jira/browse/FLINK-33485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > If the sub-query is guaranteed to produce at least one row, just return TRUE. > If the sub-query is guaranteed to produce no row, just return FALSE. > inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} > then it shold be adopted accordingly > examples > {code:sql} > SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2) > {code} > aggregation functions always return 1 row even if there is an empty table > then we could just replace this query with > {code:sql} > SELECT * FROM T2 > {code} > another example > {code:sql} > SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0) > {code} > {{LIMIT 0}} means no rows so it cold be optimized to > {code:sql} > SELECT * FROM MyTable > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount
[ https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789280#comment-17789280 ] Sergey Nuyanzin commented on FLINK-33485: - Merged to master as [a1d4dc0a4b05164aee41c781e75c9e9aeae5d758|https://github.com/apache/flink/commit/a1d4dc0a4b05164aee41c781e75c9e9aeae5d758] > Optimize the EXISTS sub-query by Metadata RowCount > -- > > Key: FLINK-33485 > URL: https://issues.apache.org/jira/browse/FLINK-33485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > If the sub-query is guaranteed to produce at least one row, just return TRUE. > If the sub-query is guaranteed to produce no row, just return FALSE. > inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} > then it shold be adopted accordingly > examples > {code:sql} > SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2) > {code} > aggregation functions always return 1 row even if there is an empty table > then we could just replace this query with > {code:sql} > SELECT * FROM T2 > {code} > another example > {code:sql} > SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0) > {code} > {{LIMIT 0}} means no rows so it cold be optimized to > {code:sql} > SELECT * FROM MyTable > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]
snuyanzin merged PR #23685: URL: https://github.com/apache/flink/pull/23685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]
snuyanzin commented on PR #23685: URL: https://github.com/apache/flink/pull/23685#issuecomment-1825029898 Thanks for taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (FLINK-31958) Table to DataStream allow partial fields
[ https://issues.apache.org/jira/browse/FLINK-31958 ] padavan deleted comment on FLINK-31958: - was (Author: JIRAUSER287909): up > Table to DataStream allow partial fields > > > Key: FLINK-31958 > URL: https://issues.apache.org/jira/browse/FLINK-31958 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Table SQL / API >Reporter: padavan >Priority: Major > > Hello i have a Model with many many fields, example: > {code:java} > public class UserModel { public int userId; public int count; public int zip; > public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime > wEnd; }{code} > I work with Table API, select fields and convert Table to DataStream by > Model. But problem what *i should select all fields if I don't even need it* > or i will get exception > {quote}Column types of query result and sink for do not match. Cause: > Different number of columns. > {quote} > And I just have to substitute fake data for the plugs... > > I want simple use with only fields wich i have selected like: > {code:java} > .select($("userId"), $("count").sum().as("count")); > DataStream dataStream = te.toDataStream(win, > UserModel.class);{code} > > *Excepted:* > Remove rule valdiation "Different number of columns.". If a column is not > selected it is initialized by default(T) / Null -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31958) Table to DataStream allow partial fields
[ https://issues.apache.org/jira/browse/FLINK-31958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789239#comment-17789239 ] padavan commented on FLINK-31958: - up > Table to DataStream allow partial fields > > > Key: FLINK-31958 > URL: https://issues.apache.org/jira/browse/FLINK-31958 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Table SQL / API >Reporter: padavan >Priority: Major > > Hello i have a Model with many many fields, example: > {code:java} > public class UserModel { public int userId; public int count; public int zip; > public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime > wEnd; }{code} > I work with Table API, select fields and convert Table to DataStream by > Model. But problem what *i should select all fields if I don't even need it* > or i will get exception > {quote}Column types of query result and sink for do not match. Cause: > Different number of columns. > {quote} > And I just have to substitute fake data for the plugs... > > I want simple use with only fields wich i have selected like: > {code:java} > .select($("userId"), $("count").sum().as("count")); > DataStream dataStream = te.toDataStream(win, > UserModel.class);{code} > > *Excepted:* > Remove rule valdiation "Different number of columns.". If a column is not > selected it is initialized by default(T) / Null -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
afedulov commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403659023 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: I guess it could be argued that we can always send statistics about previous rescalings as metrics, but why do we then keep vertex-based scalingHistory? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: I guess it could be argued that we can always send statistics about previous rescalings as metrics, but why do we then keep the vertex-based scalingHistory? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
afedulov commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403659023 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: I guess it could be argued that we can always send previous stats as metrics, but why do we then keep vertex-based scalingHistory? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
afedulov commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403656624 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: General question - we are missing the job-based data structure that keeps track of the past rescaling details. Should be need to add something in the future, with the current structure it is as simple as adding data fields to the ScalingRecord. I am OK with removing the map, but the question is - are we sure we won't require something similar in the future anyways? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25421] Add JdbcSink with new format [flink-connector-jdbc]
eskabetxe commented on PR #2: URL: https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1824762588 @wanglijie95 , @snuyanzin could you check please.. The sink in this moment allows non-xa (at least once) and xa (exactly once) connections I need to add documentation but we could start checking the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
gyfora commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403619658 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: `estimatedEMA = estimatedEMA * x + newMeasurmenet * (1-x)` we could start with `x=0.5` which is pretty aggressive smoothing but should be fine give we don't have many scalings -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
afedulov commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403615188 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: EMA requires to know how many previous records in the window are taken into account because this determines the weight coefficient of the new record (smoothing factor). The length of the "window" of observation is also supposed to be fixed and not span all time from the beginning, so I am not sure we are talking about the classic definition of EMA. Maybe you could sketch the calculation you have in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
afedulov commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403615188 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: EMA requires to know how many previous records in the window are taken into account because this determines the weight coefficient of the new record (smoothing factor). The length of the "window" of observation is also supposed to be fixed and not span all time from the beginning, so I am not sure we are talking about the classic definition of EMA. Maybe you could sketch a calculation you have in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
gyfora commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403612096 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: I saw this but this doesn't mention anything about history etc and refers to an offline discussion :) Combined with the other comment related to the trimming issue (losing the restart info after 24h) I think the exponential moving avg is a simpler and slightly more robust initial approach -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
afedulov commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403603955 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); Review Comment: I guess [this](https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1815381974) got buried in the notifications: ``` @gyfora me and Max briefly discussed offline and came to the conclusion that starting with evaluating the maximum restart time capped by the RESTART_TIME setting is probably good enough for the first step. It has the benefit of giving the most "conservative" evaluation and we can add the moving average after some baseline testing. What do you think? ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]
afedulov commented on PR #23777: URL: https://github.com/apache/flink/pull/23777#issuecomment-1824713681 > We could then keep all the existing code and just wrap it. The execution model is fundamentally different - a while loop that runs directly in the function vs an async loop that runs one level "above" and synchronizes via completable futures instead of exposing an explicit lock. I do not think it is possible to create such an adapter because we do not have access to that higher-level loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]
afedulov commented on code in PR #23777: URL: https://github.com/apache/flink/pull/23777#discussion_r1403579962 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/SourceReaderWithCheckpointsLatch.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.function.BooleanSupplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link SourceReader} that synchronizes emission of N elements on the arrival of the checkpoint + * barriers It 1) emits a list of elements without checkpoints in-between, 2) then waits for two + * checkpoints to complete, 3) then re-emits the same elements before 4) waiting for another two + * checkpoints and 5) exiting. + * + * This lockstep execution is possible because {@code pollNext} and {@code snapshotState} are + * executed in the same thread and the fact that {@code pollNext} can emit N elements at once. This + * reader is meant to be used solely for testing purposes as the substitution for the {@code + * FiniteTestSource} which implements the deprecated {@code SourceFunction} API. + */ +@Experimental +public class SourceReaderWithCheckpointsLatch< Review Comment: My idea was at some point to pass the number of checkpoints from the outside, but sure, we can reflect this number in the name until then. Renamed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]
afedulov commented on PR #23777: URL: https://github.com/apache/flink/pull/23777#issuecomment-1824683968 There are some things that can probably be done, but not in the scope of this PR. You can see how much complexity of the new Source API is already hidden behind the `IteratorSourceReaderBase`, the `DataGeneratorSource` and the `GeneratorFunction`. We are getting there, but in order to come up with meaningful abstractions we need to learn more about common patterns and we do so during migration of the existing SourceFunctions. I do not think there is an easy way to some up with something completely generic because of how the new Source imposes implementation of all of the low-level details with very specific classes (Splits, SplitEnumerators, serializers). See, for instance, what is going on in the `NumberSequenceSource` that we wrap around in the `DataGeneratorSource` that this implementation relies on: https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java#L218 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()
[ https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789203#comment-17789203 ] Vladislav Keda commented on FLINK-32513: Hi [~huwh]! I would like to ask if there are any news on this issue? > Job in BATCH mode with a significant number of transformations freezes on > method StreamGraphGenerator.existsUnboundedSource() > - > > Key: FLINK-32513 > URL: https://issues.apache.org/jira/browse/FLINK-32513 > Project: Flink > Issue Type: Bug >Affects Versions: 1.15.3, 1.16.1, 1.17.1 > Environment: All modes (local, k8s session, k8s application, ...) > Flink 1.15.3 > Flink 1.16.1 > Flink 1.17.1 >Reporter: Vladislav Keda >Priority: Critical > Attachments: image-2023-07-10-17-26-46-544.png > > > Flink job executed in BATCH mode with a significant number of transformations > (more than 30 in my case) takes very long time to start due to the method > StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of > the method, a lot of memory is consumed, which causes the GC to fire > frequently. > Thread Dump: > {code:java} > "main@1" prio=5 tid=0x1 nid=NA runnable > java.lang.Thread.State: RUNNABLE > at java.util.ArrayList.addAll(ArrayList.java:702) > at > org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95) > at > org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95) > at > org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at >
Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]
mxm commented on code in PR #23777: URL: https://github.com/apache/flink/pull/23777#discussion_r1403558590 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/SourceReaderWithCheckpointsLatch.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.function.BooleanSupplier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link SourceReader} that synchronizes emission of N elements on the arrival of the checkpoint + * barriers It 1) emits a list of elements without checkpoints in-between, 2) then waits for two + * checkpoints to complete, 3) then re-emits the same elements before 4) waiting for another two + * checkpoints and 5) exiting. + * + * This lockstep execution is possible because {@code pollNext} and {@code snapshotState} are + * executed in the same thread and the fact that {@code pollNext} can emit N elements at once. This + * reader is meant to be used solely for testing purposes as the substitution for the {@code + * FiniteTestSource} which implements the deprecated {@code SourceFunction} API. + */ +@Experimental +public class SourceReaderWithCheckpointsLatch< Review Comment: ```suggestion public class DoubleEmittingSourceReaderWithCheckpointsInBetween< ``` Is this more like what it's doing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
gyfora commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403543895 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); + +public void addScalingRecord(Instant startTimestamp, ScalingRecord scalingRecord) { +scalingRecords.put(startTimestamp, scalingRecord); +} + +@JsonIgnore +public Optional> getLatestScalingRecordEntry() { +if (!scalingRecords.isEmpty()) { +return Optional.of(scalingRecords.lastEntry()); +} else { +return Optional.empty(); +} +} + +/** + * Sets the end time for the latest scaling record if its parallelism matches the current job + * parallelism. + * + * @param now The current instant to be set as the end time of the scaling record. + * @param jobTopology The current job topology containing details of the job's parallelism. + * @param scalingHistory The scaling history. + * @return true if the end time is successfully set, false if the end time is already set, the + * latest scaling record cannot be found, or the target parallelism does not match the + * actual parallelism. + */ +public boolean setEndTimeIfTrackedAndParallelismMatches( +Instant now, +JobTopology jobTopology, +Map> scalingHistory) { +return getLatestScalingRecordEntry() +.map( +entry -> { +var value = entry.getValue(); +var scalingTimestamp = entry.getKey(); +if (value.getEndTime() == null) { +var targetParallelism = +getTargetParallelismOfScaledVertices( +scalingTimestamp, scalingHistory); +var actualParallelism = jobTopology.getParallelisms(); + +if (targetParallelismMatchesActual( +targetParallelism, actualParallelism)) { +value.setEndTime(now); +return true; +} +} +return false; +}) +.orElse(false); +} + +private static Map getTargetParallelismOfScaledVertices( +Instant scalingTimestamp, +Map> scalingHistory) { +return scalingHistory.entrySet().stream() +.filter(entry -> entry.getValue().containsKey(scalingTimestamp)) +.collect( +Collectors.toMap( +Map.Entry::getKey, +entry -> +entry.getValue() +.get(scalingTimestamp) +.getNewParallelism())); +} + +private static boolean targetParallelismMatchesActual( +Map
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
gyfora commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403540366 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); + +public void addScalingRecord(Instant startTimestamp, ScalingRecord scalingRecord) { +scalingRecords.put(startTimestamp, scalingRecord); +} + +@JsonIgnore +public Optional> getLatestScalingRecordEntry() { +if (!scalingRecords.isEmpty()) { +return Optional.of(scalingRecords.lastEntry()); +} else { +return Optional.empty(); +} +} + +/** + * Sets the end time for the latest scaling record if its parallelism matches the current job + * parallelism. + * + * @param now The current instant to be set as the end time of the scaling record. + * @param jobTopology The current job topology containing details of the job's parallelism. + * @param scalingHistory The scaling history. + * @return true if the end time is successfully set, false if the end time is already set, the + * latest scaling record cannot be found, or the target parallelism does not match the + * actual parallelism. + */ +public boolean setEndTimeIfTrackedAndParallelismMatches( +Instant now, +JobTopology jobTopology, +Map> scalingHistory) { +return getLatestScalingRecordEntry() +.map( +entry -> { +var value = entry.getValue(); +var scalingTimestamp = entry.getKey(); +if (value.getEndTime() == null) { +var targetParallelism = +getTargetParallelismOfScaledVertices( +scalingTimestamp, scalingHistory); +var actualParallelism = jobTopology.getParallelisms(); + +if (targetParallelismMatchesActual( +targetParallelism, actualParallelism)) { +value.setEndTime(now); +return true; +} +} +return false; +}) +.orElse(false); +} + +private static Map getTargetParallelismOfScaledVertices( +Instant scalingTimestamp, +Map> scalingHistory) { +return scalingHistory.entrySet().stream() +.filter(entry -> entry.getValue().containsKey(scalingTimestamp)) +.collect( +Collectors.toMap( +Map.Entry::getKey, +entry -> +entry.getValue() +.get(scalingTimestamp) +.getNewParallelism())); +} + +private static boolean targetParallelismMatchesActual( +Map
[jira] [Commented] (FLINK-33634) Add Conditions to Flink CRD's Status field
[ https://issues.apache.org/jira/browse/FLINK-33634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789178#comment-17789178 ] Gyula Fora commented on FLINK-33634: Could you please elaborate a bit on what exactly do you suggest we put in the conditions field? > Add Conditions to Flink CRD's Status field > -- > > Key: FLINK-33634 > URL: https://issues.apache.org/jira/browse/FLINK-33634 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Tony Garrard >Priority: Major > > From > [https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties] > it is considered best practice to provide Conditions in the Status of CRD's. > Some tooling even expects there to be a Conditions field in the status of a > CR. This issue to to propose adding a Conditions field to the CR status > e.g. > status: > conditions: > - lastTransitionTime: '2023-11-23T12:38:51Z' > status: 'True' > type: Ready -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments
[ https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789176#comment-17789176 ] Gyula Fora commented on FLINK-33633: I am not completely sure about this feature. It's generally an anti pattern for operators to do such actions. Operators should run with minimal permissions and in most prod envs admins do not want the Flink operator to have access to creating services accounts, roles and role bindings > Automatic creation of RBAC for instances of Flink Deployments > - > > Key: FLINK-33633 > URL: https://issues.apache.org/jira/browse/FLINK-33633 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Tony Garrard >Priority: Not a Priority > > Currently users have to manually create RBAC e.g. the flink service account. > When operator is watching all namespaces; creation of a FlinkDeployment in a > specific namespace may fail if the kube admin has failed to create the > required RBAC. To improve usability the operator could be coded to > automatically create these rbac resources in the instance namespace if not > present -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]
gyfora commented on code in PR #711: URL: https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403521447 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** Stores rescaling related information for the job. */ +@Experimental +@Data +@NoArgsConstructor +@Builder +public class ScalingTracking { + +/** Details related to recent rescaling operations. */ +private final TreeMap scalingRecords = new TreeMap<>(); + +public void addScalingRecord(Instant startTimestamp, ScalingRecord scalingRecord) { +scalingRecords.put(startTimestamp, scalingRecord); +} + +@JsonIgnore +public Optional> getLatestScalingRecordEntry() { +if (!scalingRecords.isEmpty()) { +return Optional.of(scalingRecords.lastEntry()); +} else { +return Optional.empty(); +} +} + +/** + * Sets the end time for the latest scaling record if its parallelism matches the current job + * parallelism. + * + * @param now The current instant to be set as the end time of the scaling record. + * @param jobTopology The current job topology containing details of the job's parallelism. + * @param scalingHistory The scaling history. + * @return true if the end time is successfully set, false if the end time is already set, the + * latest scaling record cannot be found, or the target parallelism does not match the + * actual parallelism. + */ +public boolean setEndTimeIfTrackedAndParallelismMatches( +Instant now, +JobTopology jobTopology, +Map> scalingHistory) { +return getLatestScalingRecordEntry() +.map( +entry -> { +var value = entry.getValue(); +var scalingTimestamp = entry.getKey(); +if (value.getEndTime() == null) { +var targetParallelism = +getTargetParallelismOfScaledVertices( +scalingTimestamp, scalingHistory); +var actualParallelism = jobTopology.getParallelisms(); + +if (targetParallelismMatchesActual( +targetParallelism, actualParallelism)) { +value.setEndTime(now); Review Comment: Should we maybe log this on debug? so we have an overview if we want to debug this? ## docs/layouts/shortcodes/generated/auto_scaler_configuration.html: ## @@ -80,6 +80,18 @@ Duration Expected restart time to be used until the operator can determine it reliably from history. + +job.autoscaler.restart.time.tracked.enabled +false +Boolean +Whether to use the actually observed rescaling restart times instead of the fixed 'job.autoscaler.restart.time' configuration. If set to true, the maximum restart duration over a number of samples will be used. The value of 'job.autoscaler.restart.time' will act as an upper bound. + + +job.autoscaler.restart.time.tracked.limit Review Comment: Sorry for the late comment, we could consider changing this to `job.autoscaler.restart.time-tracking.enabled`
Re: [PR] [FLINK-27681] RocksdbStatebackend verify incremental sst file checksum during checkpoint [flink]
masteryhx commented on code in PR #23765: URL: https://github.com/apache/flink/pull/23765#discussion_r1403408187 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java: ## @@ -80,7 +86,8 @@ public RocksNativeFullSnapshotStrategy( @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull File instanceBasePath, @Nonnull UUID backendUID, -@Nonnull RocksDBStateUploader rocksDBStateUploader) { +@Nonnull RocksDBStateUploader rocksDBStateUploader, +RocksDBStateFileVerifier stateFileVerifier) { Review Comment: nit: Nullable Annotation ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java: ## @@ -86,6 +86,18 @@ public class RocksDBOptions { .withDescription( "The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend."); +/** + * Whether to verify the Checksum of the incremental sst file during Checkpoint in + * RocksDBStateBackend. + */ +@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) +public static final ConfigOption CHECKPOINT_VERIFY_CHECKSUM_ENABLE = + ConfigOptions.key("state.backend.rocksdb.checkpoint.verify.checksum.enable") +.booleanType() +.defaultValue(false) +.withDescription( +"Whether to verify the Checksum of the incremental sst file during Checkpoint in RocksDBStateBackend"); Review Comment: Could we also add some important messages here: 1. It may introduce some overhead for checkpoint procedure if enable. 2. If checksum fail, we will fail the the checkpoint. ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksdbStateFileVerifierTest.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.FileUtils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.Checkpoint; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX; +import static org.junit.Assert.fail; + +/** {@link RocksDBStateFileVerifier} test. */ +public class RocksdbStateFileVerifierTest { + +@Rule public TemporaryFolder folder = new TemporaryFolder(); + +@Test +public void rocksdbStateFileVerifierTest() throws Exception { +ArrayList columnFamilyHandles = new ArrayList<>(1); +String rootPath = folder.newFolder().getAbsolutePath(); +File dbPath = new File(rootPath, "db"); +File cpPath = new File(rootPath, "cp"); + +try (DBOptions dbOptions = new DBOptions().setCreateIfMissing(true); +ColumnFamilyOptions colOptions = new ColumnFamilyOptions(); +Options sstFileReaderOptions = new Options(dbOptions, colOptions); +WriteOptions writeOptions = new WriteOptions().setDisableWAL(true); +RocksDB db = +RocksDB.open( +dbOptions, +dbPath.toString(), +Collections.singletonList( +new ColumnFamilyDescriptor( +"default".getBytes(), colOptions)), +
[jira] [Created] (FLINK-33634) Add Conditions to Flink CRD's Status field
Tony Garrard created FLINK-33634: Summary: Add Conditions to Flink CRD's Status field Key: FLINK-33634 URL: https://issues.apache.org/jira/browse/FLINK-33634 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0 Reporter: Tony Garrard >From >[https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties] > it is considered best practice to provide Conditions in the Status of CRD's. >Some tooling even expects there to be a Conditions field in the status of a >CR. This issue to to propose adding a Conditions field to the CR status e.g. status: conditions: - lastTransitionTime: '2023-11-23T12:38:51Z' status: 'True' type: Ready -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][docs] Remove reference to obsolete Collector and refer to yield instead [flink]
afedulov commented on PR #23776: URL: https://github.com/apache/flink/pull/23776#issuecomment-1824550622 cc @alpinegizmo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments
[ https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tony Garrard updated FLINK-33633: - Priority: Not a Priority (was: Major) > Automatic creation of RBAC for instances of Flink Deployments > - > > Key: FLINK-33633 > URL: https://issues.apache.org/jira/browse/FLINK-33633 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Tony Garrard >Priority: Not a Priority > > Currently users have to manually create RBAC e.g. the flink service account. > When operator is watching all namespaces; creation of a FlinkDeployment in a > specific namespace may fail if the kube admin has failed to create the > required RBAC. To improve usability the operator could be coded to > automatically create these rbac resources in the instance namespace if not > present -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments
Tony Garrard created FLINK-33633: Summary: Automatic creation of RBAC for instances of Flink Deployments Key: FLINK-33633 URL: https://issues.apache.org/jira/browse/FLINK-33633 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0 Reporter: Tony Garrard Currently users have to manually create RBAC e.g. the flink service account. When operator is watching all namespaces; creation of a FlinkDeployment in a specific namespace may fail if the kube admin has failed to create the required RBAC. To improve usability the operator could be coded to automatically create these rbac resources in the instance namespace if not present -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33632) Add custom mutator plugin
Tony Garrard created FLINK-33632: Summary: Add custom mutator plugin Key: FLINK-33632 URL: https://issues.apache.org/jira/browse/FLINK-33632 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0 Reporter: Tony Garrard Currently users have the ability to provide custom validators to the operator. It would be great if we followed the same pattern to provide custom mutators -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33630) CoordinationResponse should be wrapped by SerializedValue in TaskOperatorEventGateway and JobMasterOperatorEventGateway
[ https://issues.apache.org/jira/browse/FLINK-33630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789146#comment-17789146 ] Hang Ruan commented on FLINK-33630: --- Hi, all. I would like to help to resolve this problem. Please assign this issue to me. Thanks. > CoordinationResponse should be wrapped by SerializedValue in > TaskOperatorEventGateway and JobMasterOperatorEventGateway > --- > > Key: FLINK-33630 > URL: https://issues.apache.org/jira/browse/FLINK-33630 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.17.1 >Reporter: Qingsheng Ren >Priority: Major > > FLINK-26077 introduced a two-way RPC between operator and coordinator, but > {{CoordinationResponse}} is not wrapped by {{{}SerializedValue{}}}: > > [https://github.com/apache/flink/blob/c61c09e464073fae430cab2dd56bd608f9d275fd/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/a.java#L254-L255|https://github.com/apache/flink/blob/34620fc5c5698d00e64c6b15f8ce84f807a2e0d7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java#L54] > > This might be a problem if the implementation of {{CoordinationResponse}} is > provided in user code and loaded by user code classloader, because Pekko RPC > handler always uses app classloader for serializing and deserializing RPC > parameters and return values, which will lead to {{ClassNotFoundException}} > in this case. Similar to what we do for the request, we need to wrap a > {{SerializesValue}} around the response to make sure RPC calls won't cause > {{{}ClassNotFoundException{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1402925601 ## docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ## @@ -442,4 +445,23 @@ migrate from the old abstractions. The steps to do this is as follows: `TypeSerializerConfigSnapshot` implementation as will as the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). +## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.18 Review Comment: ```suggestion ## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.19 ``` And this part has a series of 1.18 ## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ## @@ -345,12 +345,15 @@ public final class GenericArraySerializerSnapshot extends CompositeTypeSerial this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } -@Override -protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) { -return (this.componentClass == newSerializer.getComponentClass()) -? OuterSchemaCompatibility.COMPATIBLE_AS_IS -: OuterSchemaCompatibility.INCOMPATIBLE; -} + @Override + protected OuterSchemaCompatibility resolveOuterSchemaCompatibility( + TypeSerializerSnapshot oldSerializerSnapshot) { + GenericArraySerializerSnapshot oldGenericArraySerializerSnapshot = + (GenericArraySerializerSnapshot) oldSerializerSnapshot; + return (this.componentClass == oldGenericArraySerializerSnapshot.componentClass) + ? OuterSchemaCompatibility.COMPATIBLE_AS_IS + : OuterSchemaCompatibility.INCOMPATIBLE; + } Review Comment: Same comment: we should use 4 spaces instead of tab. ## docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ## @@ -342,12 +342,15 @@ public final class GenericArraySerializerSnapshot extends CompositeTypeSerial this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } -@Override -protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) { -return (this.componentClass == newSerializer.getComponentClass()) -? OuterSchemaCompatibility.COMPATIBLE_AS_IS -: OuterSchemaCompatibility.INCOMPATIBLE; -} + @Override + protected OuterSchemaCompatibility resolveOuterSchemaCompatibility( + TypeSerializerSnapshot oldSerializerSnapshot) { + GenericArraySerializerSnapshot oldGenericArraySerializerSnapshot = + (GenericArraySerializerSnapshot) oldSerializerSnapshot; + return (this.componentClass == oldGenericArraySerializerSnapshot.componentClass) + ? OuterSchemaCompatibility.COMPATIBLE_AS_IS + : OuterSchemaCompatibility.INCOMPATIBLE; + } Review Comment: keep the code style are same. We should use 4 spaces instead of tab. ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java: ## @@ -34,21 +34,21 @@ public class CompositeTypeSerializerUtil { * can be used by legacy snapshot classes, which have a newer implementation implemented as a * {@link CompositeTypeSerializerSnapshot}. * - * @param newSerializer the new serializer to check for compatibility. + * @param legacySerializerSnapshot the legacy serializer snapshot to check for compatibility. * @param newCompositeSnapshot an instance of the new snapshot class to delegate compatibility * checks to. This instance should already contain the outer snapshot information. * @param legacyNestedSnapshots the nested serializer snapshots of the legacy composite * snapshot. * @return the result compatibility. */ public static TypeSerializerSchemaCompatibility delegateCompatibilityCheckToNewSnapshot( -TypeSerializer newSerializer, -CompositeTypeSerializerSnapshot newCompositeSnapshot, +TypeSerializerSnapshot legacySerializerSnapshot, Review Comment: It use the `legacy` as the name prefix, and the caller uses `old` as the prefix. I see most of your changes use `old` or `previous` as the prefix, should we unify them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this
[jira] [Commented] (FLINK-33565) The concurrentExceptions doesn't work
[ https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789137#comment-17789137 ] Rui Fan commented on FLINK-33565: - Before creating this JIRA, I found that the concurrentExceptions doesn't work either multi-region or single region. So I totally agree with you. > The concurrentExceptions doesn't work > - > > Key: FLINK-33565 > URL: https://issues.apache.org/jira/browse/FLINK-33565 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > First of all, thanks to [~mapohl] for helping double-check in advance that > this was indeed a bug . > Displaying exception history in WebUI is supported in FLINK-6042. > h1. What's the concurrentExceptions? > When an execution fails due to an exception, other executions in the same > region will also restart, and the first Exception is rootException. If other > restarted executions also report Exception at this time, we hope to collect > these exceptions and Displayed to the user as concurrentExceptions. > h2. What's this bug? > The concurrentExceptions is always empty in production, even if other > executions report exception at very close times. > h1. Why doesn't it work? > If one job has all-to-all shuffle, this job only has one region, and this > region has a lot of executions. If one execution throw exception: > * JobMaster will mark the state as FAILED for this execution. > * The rest of executions of this region will be marked to CANCELING. > ** This call stack can be found at FLIP-364 > [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover] > > When these executions throw exception as well, it JobMaster will mark the > state from CANCELING to CANCELED instead of FAILED. > The CANCELED execution won't call FAILED logic, so their exceptions are > ignored. > Note: all reports are executed inside of JobMaster RPC thread, it's single > thread. So these reports are executed serially. So only one execution is > marked to FAILED, and the rest of executions will be marked to CANCELED later. > h1. How to fix it? > Offline discuss with [~mapohl] , we need to discuss with community should we > keep the concurrentExceptions first. > * If no, we can remove related logic directly > * If yew, we discuss how to fix it later. -- This message was sent by Atlassian Jira (v8.20.10#820010)