[PR] [FLINK-34961] Use dedicated CI name for Kubernetes Operator to differentiate it in infra-reports [flink-kubernetes-operator]
snuyanzin opened a new pull request, #813: URL: https://github.com/apache/flink-kubernetes-operator/pull/813 ## What is the purpose of the change The problem with current GHA ci is that it has `CI` name which is the same across multiple Flink projects and Apache INFRA doesn't differentiate it in it's GHA usage report https://infra-reports.apache.org/#ghactions . The idea is to use different names to cope with this ## Brief change log changed name for gha ci ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: ( no) - Core observer or reconciler logic that is regularly executed: ( no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35045][state] Introduce Internal State for Async State API [flink]
masteryhx commented on code in PR #24651: URL: https://github.com/apache/flink/pull/24651#discussion_r1562071643 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +/** + * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the + * {@link State} being the root of the public API state hierarchy. + * + * The public API state hierarchy is intended to be programmed against by Flink applications. The + * internal state hierarchy holds all the auxiliary methods that communicates with {@link + * AsyncExecutionController} and not intended to be used by user applications. + * + * @param The type of key the state is associated to. + * @param The type of values kept internally in state. + */ +@Internal +public abstract class InternalKvState implements State { + +private final AsyncExecutionController asyncExecutionController; + +private final StateDescriptor stateDescriptor; + +/** + * Creates a new InternalKvState with the given asyncExecutionController and stateDescriptor. + */ +public InternalKvState( +AsyncExecutionController asyncExecutionController, +StateDescriptor stateDescriptor) { +this.asyncExecutionController = asyncExecutionController; +this.stateDescriptor = stateDescriptor; +} + +/** + * Submit a state request to AEC. + * + * @param stateRequestType the type of this request. + * @param payload the payload input for this request. + * @return the state future. + */ +protected StateFuture handleRequest( +StateRequestType stateRequestType, IN payload) { +return asyncExecutionController.handleRequest(this, stateRequestType, payload); +} + +/** Return specific {@code StateDescriptor}. */ +public StateDescriptor getStateDescriptor() { +return stateDescriptor; +} +} Review Comment: I have added a method of value serializer and related test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]
Jiabao-Sun commented on code in PR #3221: URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560833766 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java: ## @@ -108,7 +110,19 @@ public void execute(Context context) throws Exception { this.taskRunning = true; try { while (taskRunning) { -Optional next = Optional.ofNullable(changeStreamCursor.tryNext()); +Optional next; +try { +next = Optional.ofNullable(changeStreamCursor.tryNext()); +} catch (MongoCommandException e) { +if (MongoUtils.checkIfResumeTokenExpires(e)) { +resumeTokenExpired = true; Review Comment: Do we need to reset its value? ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java: ## @@ -236,7 +250,7 @@ private MongoChangeStreamCursor openChangeStreamCursor( BsonDocument resumeToken = offset.getResumeToken(); BsonTimestamp timestamp = offset.getTimestamp(); -if (resumeToken != null) { +if (resumeToken != null && !resumeTokenExpired) { Review Comment: It should be noted that `startAtOperationTime` is supported starting from MongoDB version 4.0. The prerequisite for executing the following code block is that the `resumeToken` does not exist, which occurs in two scenarios: 1. a specified timestamp is used as the starting offset. 2. prior to version 4.0.7, when a collection has not been updated for a long time and the `postResumeToken` cannot be obtained, the current timestamp is used as the starting offset. In the `else` branch of the code below, if we cannot start the Change Stream using `startAtOperationTime ` , we fallback to starting the Change Stream from the latest offset. Before MongoDB 4.0, we may lose data during the snapshot changes, but the likelihood of losing data during the snapshot is relatively low: if a collection is highly likely to change, we can obtain the `resumeToken` at its starting position; if a collection has not changed for a long time and we cannot obtain the `resumeToken` and `postResumeToken`, the likelihood of data loss during the snapshot is very low. However, when we encounter an invalid `resumeToken` during runtime or when recovering from a checkpoint, there is a high possibility of data loss. I have reconsidered this issue, and if we cannot recover the change stream from the `resumeToken`, it is highly likely that we cannot recover it through `startAtOperationTime` either, as they correspond to the same position in the `oplog`. In `mongo-kafka`, there is an explicit configuration called "tolerant-errors" provided to handle interruptions in the change stream. But in some scenarios, consistency requirements may outweigh availability, we should throw an exception to let user re-run the task from beginning. ```java else { if (supportsStartAtOperationTime) { LOG.info("Open the change stream at the timestamp: {}", timestamp); changeStreamIterable.startAtOperationTime(timestamp); } else { LOG.warn("Open the change stream of the latest offset"); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35045][state] Introduce Internal State for Async State API [flink]
masteryhx commented on code in PR #24651: URL: https://github.com/apache/flink/pull/24651#discussion_r1562060617 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.internal; Review Comment: I think it's enough currently. Just modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836395#comment-17836395 ] yuanfenghu edited comment on FLINK-35035 at 4/12/24 6:08 AM: - [~echauchot] Thank you for your reply. I think you are looking at this scene from the perspective of Reactive Mode, because Reactive Mode only uses the resources of the cluster as a criterion for task parallelism. I don’t know if I understand it correctly. But my above scenario is in non-Reactive Mode. I just use the adaptive scheduler, which means that I increase the parallelism of the running task from 10 to 12. However, because min-parallelism-increase=5, I am satisfied in the cluster slot. When the condition of 12 is met, the expansion of the task cannot be triggered immediately, but it needs to wait for scaling-interval.max before the expansion can be triggered. My purpose is to trigger the expansion when the parallelism of 12 is met, instead of having to after scaling-interval.max or min-parallelism-increase was (Author: JIRAUSER296932): [~echauchot] Thank you for your reply. You should look at this issue from the perspective of Reactive Mode, because Reactive Mode only uses the resources of the cluster as a criterion for task parallelism. I don’t know if I understand it correctly. But my above scenario is in non-Reactive Mode. But I use the adaptive scheduler, which means that I increase the parallelism of the running task from 10 to 12. However, because min-parallelism-increase=5, I am satisfied in the cluster slot. When the condition of 12 is met, the expansion of the task cannot be triggered immediately, but it needs to wait for scaling-interval.max before the expansion can be triggered. My purpose is to trigger the expansion when the parallelism of 12 is met, instead of having to after scaling-interval.max > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin opened a new pull request, #24656: URL: https://github.com/apache/flink/pull/24656 1.19 backport for parent PR #24397 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
flinkbot commented on PR #24656: URL: https://github.com/apache/flink/pull/24656#issuecomment-2051047110 ## CI report: * cf2c203cc40e921dd455cb3ec4bab0048400f029 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]
fredia commented on code in PR #22973: URL: https://github.com/apache/flink/pull/22973#discussion_r1562003338 ## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ## @@ -44,6 +44,42 @@ String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. + + state.checkpoints.file-merging.across-checkpoint-boundary +false +Boolean +Only relevant if state.checkpoints.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. + + +state.checkpoints.file-merging.enabled Review Comment: The `position` can only determine the order of `xxx_secttion.html`. `checkpointing_configuration.html` is generated by `ConfigOptionsDocGenerator#generateTablesForClass`, it used `DocumentedKey` as the [comparing key](https://github.com/apache/flink/blob/master/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java#L547). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]
Zakelly commented on code in PR #22973: URL: https://github.com/apache/flink/pull/22973#discussion_r1561983143 ## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ## @@ -44,6 +44,42 @@ String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. + + state.checkpoints.file-merging.across-checkpoint-boundary +false +Boolean +Only relevant if state.checkpoints.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. + + +state.checkpoints.file-merging.enabled Review Comment: I found the annotation `@Documentation.Section` can specify `position`, may be helpful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35089][runtime] Serialize the lastRecordAttributes in AbstractStreamOperator [flink]
Sxnan commented on PR #24655: URL: https://github.com/apache/flink/pull/24655#issuecomment-2050918953 @xintongsong Can you help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35089][runtime] Serialize the lastRecordAttributes in AbstractStreamOperator [flink]
flinkbot commented on PR #24655: URL: https://github.com/apache/flink/pull/24655#issuecomment-205094 ## CI report: * b8758babe99b2fc7f080026b3615e2b5ff7da164 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
[ https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35089: --- Labels: pull-request-available (was: ) > Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes > -- > > Key: FLINK-35089 > URL: https://issues.apache.org/jira/browse/FLINK-35089 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the > `AbstractStreamOperator` are transient. The two fields will be null when it > is deserialized in TaskManager, which may cause an NPE. > To fix it, we propose to make the RecordAttributes serializable and these > fields non-transient. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35089][runtime] Serialize the lastRecordAttributes in AbstractStreamOperator [flink]
Sxnan opened a new pull request, #24655: URL: https://github.com/apache/flink/pull/24655 ## What is the purpose of the change Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we make the RecordAttributes serializable and these fields non-transient. ## Brief change log - Make the RecordAttributes serializable - Make `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` non-transient ## Verifying this change This change added tests and can be verified as follows: - Integration test is added `org.apache.flink.test.streaming.runtime.RecordAttributesPropagationITCase` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializer: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
[ https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35089: --- Description: Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we propose to make the RecordAttributes serializable and these fields non-transient. was: Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we propose to make the RecordAttributes serialization and these fields non-transient. > Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes > -- > > Key: FLINK-35089 > URL: https://issues.apache.org/jira/browse/FLINK-35089 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: Xuannan Su >Priority: Major > > Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the > `AbstractStreamOperator` are transient. The two fields will be null when it > is deserialized in TaskManager, which may cause an NPE. > To fix it, we propose to make the RecordAttributes serializable and these > fields non-transient. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.
[ https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836403#comment-17836403 ] Hongshun Wang commented on FLINK-35093: --- I'd like to do it > Postgres source connector support SPECIFIC_OFFSETS start up mode from an > existed replication slot. > -- > > Key: FLINK-35093 > URL: https://issues.apache.org/jira/browse/FLINK-35093 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Hongshun Wang >Priority: Major > > Current, Postgres source connector only support INITIAL and LATEST mode. > However, sometimes, user want to restart from an existed replication slot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.
Hongshun Wang created FLINK-35093: - Summary: Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot. Key: FLINK-35093 URL: https://issues.apache.org/jira/browse/FLINK-35093 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: Hongshun Wang Current, Postgres source connector only support INITIAL and LATEST mode. However, sometimes, user want to restart from an existed replication slot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35090) Doris sink fails to create table when database does not exist
[ https://issues.apache.org/jira/browse/FLINK-35090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35090: --- Labels: pull-request-available (was: ) > Doris sink fails to create table when database does not exist > - > > Key: FLINK-35090 > URL: https://issues.apache.org/jira/browse/FLINK-35090 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Xiqian YU >Priority: Minor > Labels: pull-request-available > > Currently, Doris sink connector doesn't support creating database > automatically. When user specifies a sink namespace with non-existing > database in YAML config, Doris connector will crash. > Expected behaviour: Doris sink connector should create both database and > table automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35090][cdc][doris] Add database auto-creating support for Doris sink pipeline connector [flink-cdc]
yuxiqian commented on PR #3222: URL: https://github.com/apache/flink-cdc/pull/3222#issuecomment-2050900067 @lvyanquan PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34903][MySQL][Feature] Add mysql-pipeline-connector with table.exclude.list option to exclude unnecessary tables [flink-cdc]
shiyiky commented on code in PR #3186: URL: https://github.com/apache/flink-cdc/pull/3186#discussion_r1561958443 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java: ## @@ -79,6 +80,49 @@ public void testNoMatchedTable() { .hasMessageContaining("Cannot find any table by the option 'tables' = " + tables); } +@Test +public void testExcludeTable() { +inventoryDatabase.createAndInitialize(); +Map options = new HashMap<>(); +options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); +options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); +options.put(USERNAME.key(), TEST_USER); +options.put(PASSWORD.key(), TEST_PASSWORD); +// db has three tables , table.list= (products,orders shipments) +options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); +String tableExcludeList = inventoryDatabase.getDatabaseName() + ".prod\\.orders"; Review Comment: u are right and I will adapter it,tks。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]
fredia commented on code in PR #22973: URL: https://github.com/apache/flink/pull/22973#discussion_r1561957606 ## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ## @@ -44,6 +44,42 @@ String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. + + state.checkpoints.file-merging.across-checkpoint-boundary +false +Boolean +Only relevant if state.checkpoints.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. + + +state.checkpoints.file-merging.enabled +false +Boolean +Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. + + +state.checkpoints.file-merging.max-file-size +32 mb +MemorySize +Max size of a physical file for merged checkpoints. + + + state.checkpoints.file-merging.max-space-amplification +0.75 Review Comment: 👍 I took it as the proportion of invalid data, the old description is "A threshold that triggers a compaction (re-uploading) of one physical file. If the amount of invalid data in a physical file exceeds the threshold, a new physical file will be created and uploaded." Changed it to 2 and modify the description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1561954297 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java: ## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.utils; + +import org.apache.flink.annotation.PublicEvolving; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * MemorySize is a representation of a number of bytes, viewable in different units. + * + * Parsing + * + * The size can be parsed from a text expression. If the expression is a pure number, the value + * will be interpreted as bytes. + */ +@PublicEvolving +public class MemorySize implements java.io.Serializable, Comparable { Review Comment: I think for `MemorySize` we should use the one the Flink instead of creating our own. It is a public API (marked as `@PublicEvolving`), and Flink CDC doesn't use this one in our production code. IIUC it is just used for parsing memory size expressions in configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]
fredia commented on code in PR #22973: URL: https://github.com/apache/flink/pull/22973#discussion_r1561952908 ## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ## @@ -44,6 +44,42 @@ String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. + + state.checkpoints.file-merging.across-checkpoint-boundary +false +Boolean +Only relevant if state.checkpoints.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. + + +state.checkpoints.file-merging.enabled Review Comment: I'm afraid not, it's in alphabetical order. For example, the configuration options related to `speculative ` are also like this. ![image](https://github.com/apache/flink/assets/18653940/117a8695-3733-4454-8f58-70d3ea0c9f9e) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836395#comment-17836395 ] yuanfenghu commented on FLINK-35035: [~echauchot] Thank you for your reply. You should look at this issue from the perspective of Reactive Mode, because Reactive Mode only uses the resources of the cluster as a criterion for task parallelism. I don’t know if I understand it correctly. But my above scenario is in non-Reactive Mode. But I use the adaptive scheduler, which means that I increase the parallelism of the running task from 10 to 12. However, because min-parallelism-increase=5, I am satisfied in the cluster slot. When the condition of 12 is met, the expansion of the task cannot be triggered immediately, but it needs to wait for scaling-interval.max before the expansion can be triggered. My purpose is to trigger the expansion when the parallelism of 12 is met, instead of having to after scaling-interval.max > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
morazow commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1561946899 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.RowData; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +class Generator implements Iterator, Iterable { +final int numKeys; + +private int keyIndex = 0; + +private final long durationMs; +private final long stepMs; +private final long offsetMs; +private long ms = 0; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +return new Generator(numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +} + +Generator(int numKeys, long durationMs, long stepMs, long offsetMs) { +this.numKeys = numKeys; +this.durationMs = durationMs; +this.stepMs = stepMs; +this.offsetMs = offsetMs; +} + +@Override +public boolean hasNext() { +return ms < durationMs; +} + +@Override +public RowData next() { +if (!hasNext()) { +throw new NoSuchElementException(); +} +RowData row = +new GeneratedRow( +keyIndex, +LocalDateTime.ofInstant( +Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC), +"Some payload..."); Review Comment: I saw this was in the original change, but should we randomize the payload? ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) thr jobClient.getJobExecutionResult().get(); final String expected = -"1980,1970-01-01 00:00:00.0\n" -+ "1980,1970-01-01 00:00:20.0\n" -+ "1980,1970-01-01 00:00:40.0\n"; +"1980,1970-01-01 00:00:00\n" Review Comment: Why is this change is required? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35091][Metrics][Minor] Fix incorrect warning msg in JM log when use metric reporter [flink]
flinkbot commented on PR #24654: URL: https://github.com/apache/flink/pull/24654#issuecomment-2050853597 ## CI report: * 682a59db0c1587c46c9fe72fd0dd8e1283e51e8a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35092) Add integrated test for Doris sink pipeline connector
Xiqian YU created FLINK-35092: - Summary: Add integrated test for Doris sink pipeline connector Key: FLINK-35092 URL: https://issues.apache.org/jira/browse/FLINK-35092 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: Xiqian YU Currently, no integrated test are being applied to Doris pipeline connector (there's only one DorisRowConverterTest case for now). Adding ITcases would improving Doris connector's code quality and reliability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34853] Draft: Submit CDC Job To Flink K8S Native Application Mode [flink-cdc]
PatrickRen commented on PR #3093: URL: https://github.com/apache/flink-cdc/pull/3093#issuecomment-2050850214 @czy006 Could you rebase the PR to the latest master? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35091][Metrics][Minor] Fix incorrect warning msg in JM log when use metric reporter [flink]
leosanqing opened a new pull request, #24654: URL: https://github.com/apache/flink/pull/24654 ## Desc I encountered an issue while upgrading Flink from version 1.14 to 1.18. After the upgrade, I noticed that some monitoring metrics were not being reported to InfluxDB. Upon checking the Job Manager (JM) logs, I found an error indicating that the previously used classes are no longer supported. However, there seems to be an oddly phrased error message that looks like it might have been written incorrectly. The error message reads: "The reporter configuration of '{}' configures the reporter class, which is no a no longer supported approach to configure reporters." + " Please configure a factory class instead:" I believe the correct phrasing should be: "The reporter configuration of '{}' configures the reporter class, which is a no longer supported approach to configure reporters." + " Please configure a factory class instead:" It appears that the words "no a" were accidentally added, making the sentence grammatically incorrect and potentially confusing for users. ## What is the purpose of the change Fix incorrect warning msg in JM log when use metric reporter ## Brief change log Just warning log print The error message reads: "The reporter configuration of '{}' configures the reporter class, which is no a no longer supported approach to configure reporters." + " Please configure a factory class instead:" I believe the correct phrasing should be: "The reporter configuration of '{}' configures the reporter class, which is a no longer supported approach to configure reporters." + " Please configure a factory class instead:" ![image](https://github.com/apache/flink/assets/20400582/14b6c38d-933e-4517-a2f8-386967c8e553) ## Verifying this change No need to test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35091) Incorrect warning msg in JM when use metric reporter
[ https://issues.apache.org/jira/browse/FLINK-35091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35091: --- Labels: pull-request-available (was: ) > Incorrect warning msg in JM when use metric reporter > > > Key: FLINK-35091 > URL: https://issues.apache.org/jira/browse/FLINK-35091 > Project: Flink > Issue Type: Improvement >Reporter: sanqingleo >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.3 > > Attachments: image-2024-04-12-10-02-20-142.png > > > Hello, > I encountered an issue while upgrading Flink from version 1.14 to 1.18. After > the upgrade, I noticed that some monitoring metrics were not being reported > to InfluxDB. > Upon checking the Job Manager (JM) logs, I found an error indicating that the > previously used classes are no longer supported. However, there seems to be > an oddly phrased error message that looks like it might have been written > incorrectly. > The error message reads: "The reporter configuration of '{}' configures the > reporter class, which is no a no longer supported approach to configure > reporters." + " Please configure a factory class instead:" > I believe the correct phrasing should be: "The reporter configuration of '{}' > configures the reporter class, which is a no longer supported approach to > configure reporters." + " Please configure a factory class instead:" > It appears that the words "no a" were accidentally added, making the sentence > grammatically incorrect and potentially confusing for users. > > !image-2024-04-12-10-02-20-142.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController [flink]
fredia commented on PR #24633: URL: https://github.com/apache/flink/pull/24633#issuecomment-2050847516 @Zakelly Thanks for the review, updated and squashed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed
[ https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836393#comment-17836393 ] Qingsheng Ren commented on FLINK-34634: --- flink-cdc master: 48ca8623bb8fa405adb56dbe505dbad10902db89 > Restarting the job will not read the changelog anymore if it stops before the > synchronization of meta information is complete and some table is removed > --- > > Key: FLINK-34634 > URL: https://issues.apache.org/jira/browse/FLINK-34634 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Hongshun Wang >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.1.0 > > Attachments: image-2024-03-09-15-25-26-187.png, > image-2024-03-09-15-27-46-073.png > > > h3. What's the problem > Once, I removed a table from the option and then restarted the job from the > savepoint, but the job couldn't read the binlog anymore. When I checked the > logs, I found an Error level log stating: > ' The enumerator received invalid request meta group id 6, the valid meta > group id range is [0, 4].' > It appears that the Reader is requesting more splits than the Enumerator is > aware of. > However, the code should indeed remove redundant split information from the > Reader as seen in > [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does > this issue occur? > > h3. why occurs > !image-2024-03-09-15-25-26-187.png|width=751,height=329! > Upon examining the code, I discovered the cause. If the job stops before > completing all the split meta information and then restarts, this issue > occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader > is 6, and no meta information has been synchronized, leaving the > finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After > restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 > - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed > table have two split). This could lead to an out-of-range request. > !image-2024-03-09-15-27-46-073.png|width=755,height=305! > h3. How to reproduce > * Add Thread.sleep(1000L) in > com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents > to postpone split meta infos synchronization. > {code:java} > public void handleSourceEvents(SourceEvent sourceEvent) { > else if (sourceEvent instanceof BinlogSplitMetaEvent) { > LOG.debug( > "Source reader {} receives binlog meta with group id {}.", > subtaskId, > ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); > try { > Thread.sleep(1000L); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); > } {code} > * Add Thread.sleep(500L) in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne > to trigger savepoint before meta infos synchronization finishes. > > {code:java} > // step 2: execute insert and trigger savepoint with all tables added > { > // ..ingore > waitForSinkSize("sink", fetchedDataList.size()); > Thread.sleep(500L); > assertEqualsInAnyOrder(fetchedDataList, > TestValuesTableFactory.getRawResults("sink")); > finishedSavePointPath = triggerSavepointWithRetry(jobClient, > savepointDirectory); > jobClient.cancel().get(); > } > // test removing table one by one, note that there should be at least one > table remaining > for (int round = 0; round < captureAddressTables.length - 1; round++) { > ... > } > {code} > > * Add chunk-meta.group.size =2 in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement > Then, run > test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), > the error log will occur. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed
[ https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren resolved FLINK-34634. --- Assignee: Hongshun Wang Resolution: Fixed > Restarting the job will not read the changelog anymore if it stops before the > synchronization of meta information is complete and some table is removed > --- > > Key: FLINK-34634 > URL: https://issues.apache.org/jira/browse/FLINK-34634 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.1.0 > > Attachments: image-2024-03-09-15-25-26-187.png, > image-2024-03-09-15-27-46-073.png > > > h3. What's the problem > Once, I removed a table from the option and then restarted the job from the > savepoint, but the job couldn't read the binlog anymore. When I checked the > logs, I found an Error level log stating: > ' The enumerator received invalid request meta group id 6, the valid meta > group id range is [0, 4].' > It appears that the Reader is requesting more splits than the Enumerator is > aware of. > However, the code should indeed remove redundant split information from the > Reader as seen in > [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does > this issue occur? > > h3. why occurs > !image-2024-03-09-15-25-26-187.png|width=751,height=329! > Upon examining the code, I discovered the cause. If the job stops before > completing all the split meta information and then restarts, this issue > occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader > is 6, and no meta information has been synchronized, leaving the > finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After > restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 > - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed > table have two split). This could lead to an out-of-range request. > !image-2024-03-09-15-27-46-073.png|width=755,height=305! > h3. How to reproduce > * Add Thread.sleep(1000L) in > com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents > to postpone split meta infos synchronization. > {code:java} > public void handleSourceEvents(SourceEvent sourceEvent) { > else if (sourceEvent instanceof BinlogSplitMetaEvent) { > LOG.debug( > "Source reader {} receives binlog meta with group id {}.", > subtaskId, > ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); > try { > Thread.sleep(1000L); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); > } {code} > * Add Thread.sleep(500L) in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne > to trigger savepoint before meta infos synchronization finishes. > > {code:java} > // step 2: execute insert and trigger savepoint with all tables added > { > // ..ingore > waitForSinkSize("sink", fetchedDataList.size()); > Thread.sleep(500L); > assertEqualsInAnyOrder(fetchedDataList, > TestValuesTableFactory.getRawResults("sink")); > finishedSavePointPath = triggerSavepointWithRetry(jobClient, > savepointDirectory); > jobClient.cancel().get(); > } > // test removing table one by one, note that there should be at least one > table remaining > for (int round = 0; round < captureAddressTables.length - 1; round++) { > ... > } > {code} > > * Add chunk-meta.group.size =2 in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement > Then, run > test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), > the error log will occur. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]
PatrickRen merged PR #3134: URL: https://github.com/apache/flink-cdc/pull/3134 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34689) check binlog_row_value_optoins
[ https://issues.apache.org/jira/browse/FLINK-34689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren resolved FLINK-34689. --- Fix Version/s: cdc-3.1.0 Resolution: Fixed > check binlog_row_value_optoins > -- > > Key: FLINK-34689 > URL: https://issues.apache.org/jira/browse/FLINK-34689 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Lee SeungMin >Assignee: Lee SeungMin >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.1.0 > > Attachments: image-2024-03-15-12-56-49-344.png > > > When {{binlog_row_value_optoins}} is set to {{{}PARTIAL_JSON{}}}, > the update operator remains as {{{}Update_rows_partial{}}}. > Flink CDC does not parse this event because {{Update_row_partial}} binlog > event is mapped to {{PARTIAL_UPDATE_ROWS_EVENT}} and Flink CDC do not handle > that event type > > Example of Update_row_partial (when {{binlog_row_value_optoins}} = > {{PARTIAL_JSON)}} > !image-2024-03-15-12-56-49-344.png|width=1015,height=30! > So, we have to check {{binlog_row_value_optoins}} before starting. > > > Cretae PR: [[MySQL][Feature] check binlog_row_value_optoins by SML0127 · Pull > Request #3148 · apache/flink-cdc > (github.com)|https://github.com/apache/flink-cdc/pull/3148] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]
yuxiqian commented on PR #3215: URL: https://github.com/apache/flink-cdc/pull/3215#issuecomment-2050843987 I'm not familiar with Doris, could @lvyanquan @JNSimba take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34689) check binlog_row_value_optoins
[ https://issues.apache.org/jira/browse/FLINK-34689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836391#comment-17836391 ] Qingsheng Ren commented on FLINK-34689: --- flink-cdc master: af7665d33834b4141f875862df59ec1f56dddcbb > check binlog_row_value_optoins > -- > > Key: FLINK-34689 > URL: https://issues.apache.org/jira/browse/FLINK-34689 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Lee SeungMin >Assignee: Lee SeungMin >Priority: Major > Labels: pull-request-available > Attachments: image-2024-03-15-12-56-49-344.png > > > When {{binlog_row_value_optoins}} is set to {{{}PARTIAL_JSON{}}}, > the update operator remains as {{{}Update_rows_partial{}}}. > Flink CDC does not parse this event because {{Update_row_partial}} binlog > event is mapped to {{PARTIAL_UPDATE_ROWS_EVENT}} and Flink CDC do not handle > that event type > > Example of Update_row_partial (when {{binlog_row_value_optoins}} = > {{PARTIAL_JSON)}} > !image-2024-03-15-12-56-49-344.png|width=1015,height=30! > So, we have to check {{binlog_row_value_optoins}} before starting. > > > Cretae PR: [[MySQL][Feature] check binlog_row_value_optoins by SML0127 · Pull > Request #3148 · apache/flink-cdc > (github.com)|https://github.com/apache/flink-cdc/pull/3148] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34689][MySQL][Feature] check binlog_row_value_options [flink-cdc]
PatrickRen merged PR #3148: URL: https://github.com/apache/flink-cdc/pull/3148 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35091) Incorrect warning msg in JM when use metric reporter
[ https://issues.apache.org/jira/browse/FLINK-35091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanqingleo updated FLINK-35091: --- Summary: Incorrect warning msg in JM when use metric reporter (was: Incorrect Warning msg in JM when use influxdb) > Incorrect warning msg in JM when use metric reporter > > > Key: FLINK-35091 > URL: https://issues.apache.org/jira/browse/FLINK-35091 > Project: Flink > Issue Type: Improvement >Reporter: sanqingleo >Priority: Minor > Fix For: 1.16.3 > > Attachments: image-2024-04-12-10-02-20-142.png > > > Hello, > I encountered an issue while upgrading Flink from version 1.14 to 1.18. After > the upgrade, I noticed that some monitoring metrics were not being reported > to InfluxDB. > Upon checking the Job Manager (JM) logs, I found an error indicating that the > previously used classes are no longer supported. However, there seems to be > an oddly phrased error message that looks like it might have been written > incorrectly. > The error message reads: "The reporter configuration of '{}' configures the > reporter class, which is no a no longer supported approach to configure > reporters." + " Please configure a factory class instead:" > I believe the correct phrasing should be: "The reporter configuration of '{}' > configures the reporter class, which is a no longer supported approach to > configure reporters." + " Please configure a factory class instead:" > It appears that the words "no a" were accidentally added, making the sentence > grammatically incorrect and potentially confusing for users. > > !image-2024-04-12-10-02-20-142.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35091) Incorrect Warning msg in JM when use influxdb
sanqingleo created FLINK-35091: -- Summary: Incorrect Warning msg in JM when use influxdb Key: FLINK-35091 URL: https://issues.apache.org/jira/browse/FLINK-35091 Project: Flink Issue Type: Improvement Reporter: sanqingleo Fix For: 1.16.3 Attachments: image-2024-04-12-10-02-20-142.png Hello, I encountered an issue while upgrading Flink from version 1.14 to 1.18. After the upgrade, I noticed that some monitoring metrics were not being reported to InfluxDB. Upon checking the Job Manager (JM) logs, I found an error indicating that the previously used classes are no longer supported. However, there seems to be an oddly phrased error message that looks like it might have been written incorrectly. The error message reads: "The reporter configuration of '{}' configures the reporter class, which is no a no longer supported approach to configure reporters." + " Please configure a factory class instead:" I believe the correct phrasing should be: "The reporter configuration of '{}' configures the reporter class, which is a no longer supported approach to configure reporters." + " Please configure a factory class instead:" It appears that the words "no a" were accidentally added, making the sentence grammatically incorrect and potentially confusing for users. !image-2024-04-12-10-02-20-142.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35090) Doris sink fails to create table when database does not exist
[ https://issues.apache.org/jira/browse/FLINK-35090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836386#comment-17836386 ] Xiqian YU commented on FLINK-35090: --- [~renqs] I'm willing to take this ticket. > Doris sink fails to create table when database does not exist > - > > Key: FLINK-35090 > URL: https://issues.apache.org/jira/browse/FLINK-35090 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Xiqian YU >Priority: Minor > > Currently, Doris sink connector doesn't support creating database > automatically. When user specifies a sink namespace with non-existing > database in YAML config, Doris connector will crash. > Expected behaviour: Doris sink connector should create both database and > table automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]
RocMarshal commented on PR #20990: URL: https://github.com/apache/flink/pull/20990#issuecomment-2050824391 Thank you @XComp @1996fanrui @Samrat002 very much for the review and sorry for the late response. I make some change based on your comments. Would you mind helping to have a checking on it if you had the time ? Thank you :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35090) Doris sink fails to create table when database does not exist
Xiqian YU created FLINK-35090: - Summary: Doris sink fails to create table when database does not exist Key: FLINK-35090 URL: https://issues.apache.org/jira/browse/FLINK-35090 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Xiqian YU Currently, Doris sink connector doesn't support creating database automatically. When user specifies a sink namespace with non-existing database in YAML config, Doris connector will crash. Expected behaviour: Doris sink connector should create both database and table automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]
RocMarshal commented on code in PR #20990: URL: https://github.com/apache/flink/pull/20990#discussion_r1561915638 ## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + +private JobExecutionResult latestExecutionResult; + +/** + * The number of times a test should be repeated. + * + * This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ +private int numberOfTestRepetitions = 1; + +private boolean isCollectionExecution; + +public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { +this.numberOfTestRepetitions = numberOfTestRepetitions; +} + +public int getParallelism() { +return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); +} + +public JobExecutionResult getLatestExecutionResult() { +return this.latestExecutionResult; +} + +public boolean isCollectionExecution() { +return isCollectionExecution; +} + +// +// Methods to create the test program and for pre- and post- test work +// + +protected abstract void testProgram() throws Exception; + +protected void preSubmit() throws Exception {} + +protected void postSubmit() throws Exception {} + +protected boolean skipCollectionExecution() { +return false; +} Review Comment: thanks a lot for the comments. It sounds good to me. If we complete firstly the migration based on the principle of minimal changes. What do you think about considering the extension mode after completing all Junit5 migrations? Because - there are many subclasses of these base classes, it is difficult for us to design extensions that are suitable and applicable to all subclasses before complete migration. - At the same time, it can make PR and commitments clearer, which is beneficial for reviewers to advance the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
Xuannan Su created FLINK-35089: -- Summary: Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes Key: FLINK-35089 URL: https://issues.apache.org/jira/browse/FLINK-35089 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.19.0 Reporter: Xuannan Su Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we propose to make the RecordAttributes serialization and these fields non-transient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]
RocMarshal commented on code in PR #20990: URL: https://github.com/apache/flink/pull/20990#discussion_r1561911292 ## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + +private JobExecutionResult latestExecutionResult; + +/** + * The number of times a test should be repeated. + * + * This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ +private int numberOfTestRepetitions = 1; + +private boolean isCollectionExecution; + +public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { +this.numberOfTestRepetitions = numberOfTestRepetitions; +} + +public int getParallelism() { +return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); +} + +public JobExecutionResult getLatestExecutionResult() { +return this.latestExecutionResult; +} + +public boolean isCollectionExecution() { +return isCollectionExecution; +} + +// +// Methods to create the test program and for pre- and post- test work +// + +protected abstract void testProgram() throws Exception; + +protected void preSubmit() throws Exception {} + +protected void postSubmit() throws Exception {} + +protected boolean skipCollectionExecution() { +return false; +} + +// +// Test entry point +// + +@Test +public void testJobWithObjectReuse() { +isCollectionExecution = false; + +// pre-submit +try { +preSubmit(); +} catch (Exception e) { +System.err.println(e.getMessage()); +e.printStackTrace(); +fail("Pre-submit work caused an error: " + e.getMessage()); Review Comment: deleted. ## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExe
Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]
RocMarshal commented on code in PR #20990: URL: https://github.com/apache/flink/pull/20990#discussion_r1561910994 ## flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java: ## @@ -35,40 +38,54 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.junit.Assume; -import org.junit.Before; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.File; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory; +import static org.assertj.core.api.Assumptions.assumeThat; /** Integration tests for Hadoop IO formats. */ -@RunWith(Parameterized.class) -public class HadoopIOFormatsITCase extends JavaProgramTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public class HadoopIOFormatsITCase extends JavaProgramTestBaseJUnit5 { private static final int NUM_PROGRAMS = 2; -private final int curProgId; +@Parameter private int curProgId; private String[] resultPath; private String[] expectedResult; private String sequenceFileInPath; private String sequenceFileInPathNull; -public HadoopIOFormatsITCase(int curProgId) { -this.curProgId = curProgId; +@BeforeEach +void checkOperatingSystem() { +// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems +assumeThat(OperatingSystem.isWindows()) +.as("This test can't run successfully on Windows.") +.isFalse(); } -@Before -public void checkOperatingSystem() { -// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems -Assume.assumeTrue( -"This test can't run successfully on Windows.", !OperatingSystem.isWindows()); +@Override +@TestTemplate +public void testJobWithObjectReuse() { +super.testJobWithoutObjectReuse(); +} + +@Override +@TestTemplate +public void testJobWithoutObjectReuse() { +super.testJobWithoutObjectReuse(); +} + +@Override +@TestTemplate +public void testJobCollectionExecution() { +super.testJobCollectionExecution(); } Review Comment: because it needs the `@TestTemplate` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]
RocMarshal commented on code in PR #20990: URL: https://github.com/apache/flink/pull/20990#discussion_r1561910064 ## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + +private JobExecutionResult latestExecutionResult; + +/** + * The number of times a test should be repeated. + * + * This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ +private int numberOfTestRepetitions = 1; + +private boolean isCollectionExecution; + +public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { +this.numberOfTestRepetitions = numberOfTestRepetitions; +} + +public int getParallelism() { +return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); +} + +public JobExecutionResult getLatestExecutionResult() { +return this.latestExecutionResult; +} + +public boolean isCollectionExecution() { +return isCollectionExecution; +} + +// +// Methods to create the test program and for pre- and post- test work +// + +protected abstract void testProgram() throws Exception; + +protected void preSubmit() throws Exception {} + +protected void postSubmit() throws Exception {} + +protected boolean skipCollectionExecution() { +return false; +} + +// +// Test entry point +// + +@Test +public void testJobWithObjectReuse() { +isCollectionExecution = false; + +// pre-submit +try { +preSubmit(); +} catch (Exception e) { +System.err.println(e.getMessage()); +e.printStackTrace(); +fail("Pre-submit work caused an error: " + e.getMessage()); +} + +// This only works because the underlying ExecutionEnvironment is a TestEnvironment +// We should fix that we are able to get access to the latest execution result from a +// different +// execution environment and how the object reuse mode is enabled +TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment(); +env.getConfig().enableObjectReuse(); + +// Possibly run the test multiple times +executeProgramMultipleTimes(env); +} + +private void executeProgramMultipleTimes(ExecutionEnvironment env) { +for (int i = 0; i < numberOfTestRepetitions; i++) { +// call the test program Review Comment: updated. ## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyrig
[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
[ https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836382#comment-17836382 ] Mason Chen commented on FLINK-34127: Sure. Can I assign myself now with committer permissions? :D > Kafka connector repo runs a duplicate of `IntegrationTests` framework tests > --- > > Key: FLINK-34127 > URL: https://issues.apache.org/jira/browse/FLINK-34127 > Project: Flink > Issue Type: Improvement > Components: Build System / CI, Connectors / Kafka >Affects Versions: kafka-3.0.2 >Reporter: Mason Chen >Priority: Major > > I found out this behavior when troubleshooting CI flakiness. These > integration tests make heavy use of the CI since they require Kafka, > Zookeeper, and Docker containers. We can further stablize CI by not > redundantly running these set of tests. > `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` > returns: > ``` > 2024-01-17T00:51:05.2943150Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2], > Semantic: [EXACTLY_ONCE]] is running. > 2024-01-17T00:51:07.6922535Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2], > Semantic: [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:56:27.1326332Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a], > Semantic: [EXACTLY_ONCE]] is running. > 2024-01-17T00:56:28.4000830Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a], > Semantic: [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:56:58.7830792Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:56:59.0544092Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:56:59.3910987Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:56:59.6025298Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:57:37.8378640Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:57:38.0144732Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:57:38.2004796Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:57:38.4072815Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T01:06:11.2933375Z Test > org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment: > [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T01:06:12.1790031Z Test > org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment: > [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T01:06:12.5703927Z Test > org.apache.flink.tests.util.ka
Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]
RocMarshal commented on code in PR #20990: URL: https://github.com/apache/flink/pull/20990#discussion_r1296008112 ## flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java: ## @@ -137,27 +132,26 @@ public void testFetchNextRecordReaderHasNewValue() throws Exception { setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); hadoopInputFormat.fetchNext(); -assertThat(hadoopInputFormat.fetched, is(true)); -assertThat(hadoopInputFormat.hasNext, is(true)); +assertThat(hadoopInputFormat.fetched).isTrue(); +assertThat(hadoopInputFormat.hasNext).isTrue(); } @Test -public void testFetchNextRecordReaderThrowsException() throws Exception { +void testFetchNextRecordReaderThrowsException() throws Exception { DummyRecordReader recordReader = mock(DummyRecordReader.class); when(recordReader.nextKeyValue()).thenThrow(new InterruptedException()); HadoopInputFormat hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); -exception.expect(IOException.class); -hadoopInputFormat.fetchNext(); + assertThatThrownBy(hadoopInputFormat::fetchNext).isInstanceOf(IOException.class); -assertThat(hadoopInputFormat.hasNext, is(true)); +assertThat(hadoopInputFormat.hasNext).isFalse(); Review Comment: Hi, @1996fanrui nice catch. sorry for no explaining this change before the review. When debugging the line, it was caused by `Expect exception mechanism` difference between junit4 and junit5. to be short, in junit4, the target line `assertThat(hadoopInputFormat.hasNext, is(true));` was not executed. So the change occurred to here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint
[ https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-30238. -- Resolution: Invalid > Unified Sink committer does not clean up state on final savepoint > - > > Key: FLINK-30238 > URL: https://issues.apache.org/jira/browse/FLINK-30238 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Fabian Paul >Priority: Critical > Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png > > > During stop-with-savepoint the committer only commits the pending > committables on notifyCheckpointComplete. > This has several downsides. > * Last committableSummary has checkpoint id LONG.MAX and is never cleared > from the state leading to that stop-with-savepoint does not work when the > pipeline recovers from a savepoint > * While the committables are committed during stop-with-savepoint they are > not forwarded to post-commit topology, potentially losing data and preventing > to close open transactions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check
[ https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836364#comment-17836364 ] Martijn Visser commented on FLINK-35088: [~elon] Please verify this with later version of Flink, since there have been many bugfixes since. > watermark alignment maxAllowedWatermarkDrift and updateInterval param need > check > > > Key: FLINK-35088 > URL: https://issues.apache.org/jira/browse/FLINK-35088 > Project: Flink > Issue Type: Improvement > Components: API / Core, Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-11-20-12-29-951.png > > > When I use watermark alignment, > 1.I found that setting maxAllowedWatermarkDrift to a negative number > initially led me to believe it could support delaying the consumption of the > source, so I tried it. Then, the upstream data flow would hang indefinitely. > Root cause: > {code:java} > long maxAllowedWatermark = globalCombinedWatermark.getTimestamp() > + watermarkAlignmentParams.getMaxAllowedWatermarkDrift(); {code} > If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark > < lastEmittedWatermark, then the SourceReader will be blocked indefinitely > and cannot recover. > I'm not sure if this is a supported feature of watermark alignment. If it's > not, I think an additional parameter validation should be implemented to > throw an exception on the client side if the value is negative. > 2.The updateInterval parameter also lacks validation. If I set it to 0, the > task will throw an exception when starting the job manager. The JDK class > java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and > throws the exception. > {code:java} > java.lang.IllegalArgumentException: null > at > java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) > ~[?:1.8.0_351] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) > ~[
[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836325#comment-17836325 ] Kenneth William Krugler commented on FLINK-35076: - Hi [~elon] - please post these questions about the impact of idleness and how to rebalance on Stack Overflow, or the Flink user list. That way the Q&A can benefit the entire community, thanks! > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-29-13-835.png! > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward. until that minimum value is > updated, only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238 ] elon_X edited comment on FLINK-35076 at 4/11/24 3:23 PM: - [~kkrugler] Thank you for your reply. Setting the idle time is not very controllable in terms of the specific timing. For example, setting it to 10 seconds, the minimum watermark will still not change within these 10 seconds unless the idle time is set as small as possible. I'm not sure if this could solve the problem and further testing is needed; For the solution of shuffling the stream, I didn't quite understand. In the Flink API: DataStream xx = env.fromSource(Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) Only DataStream supports rebalance, Source can't rebalance. I'm not quite sure how to shuffle the data source before {{{}fromSource{}}}. was (Author: JIRAUSER303028): [~kkrugler] Thank you for your reply. Setting the idle time is not very controllable in terms of the specific timing. For example, setting it to 10 seconds, the minimum watermark will still not change within these 10 seconds unless the idle time is set as small as possible. I'm not sure if this could solve the problem and further testing is needed; For the solution of shuffling the stream, I didn't quite understand. In the Flink API: DataStream xx = env.fromSource(Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) Only DataStream supports rebalance, Source can't rebalance. > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-29-13-835.png! > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward. until that minimum value is > updated, only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238 ] elon_X commented on FLINK-35076: [~kkrugler] Thank you for your reply. Setting the idle time is not very controllable in terms of the specific timing. For example, setting it to 10 seconds, the minimum watermark will still not change within these 10 seconds unless the idle time is set as small as possible. I'm not sure if this could solve the problem and further testing is needed; For the solution of shuffling the stream, I didn't quite understand. In the Flink API: DataStream xx = env.fromSource(Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) Only DataStream supports rebalance, Source can't rebalance. > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-29-13-835.png! > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward. until that minimum value is > updated, only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2049852124 @XComp Glad for your vacation! Finally I also addressed the deprecation warnings and went through the implementation of a custom connector through `DynamicTableSource`. It turned out to be quite tough, as probably it is not that common, or these new APIs are not super-well documented for now. I wanted to use `TableEnvironment.fromValues` however, I could not use it as the test was hanging... I want to understand why and, in case, file an issue for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34224) ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest timed out
[ https://issues.apache.org/jira/browse/FLINK-34224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836218#comment-17836218 ] Ryan Skraba commented on FLINK-34224: - 1.20 Java 8: Test (module: core) https://github.com/apache/flink/actions/runs/8643097154/job/23696501028#step:10:11318 > ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest > timed out > --- > > Key: FLINK-34224 > URL: https://issues.apache.org/jira/browse/FLINK-34224 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.19.0, 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The timeout appeared in the GitHub Actions workflow (currently in test phase; > [FLIP-396|https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure]): > https://github.com/XComp/flink/actions/runs/7632434859/job/20793613726#step:10:11040 > {code} > Jan 24 01:38:36 "ForkJoinPool-1-worker-1" #16 daemon prio=5 os_prio=0 > tid=0x7f3b200ae800 nid=0x406e3 waiting on condition [0x7f3b1ba0e000] > Jan 24 01:38:36java.lang.Thread.State: WAITING (parking) > Jan 24 01:38:36 at sun.misc.Unsafe.park(Native Method) > Jan 24 01:38:36 - parking to wait for <0xdfbbb358> (a > java.util.concurrent.CompletableFuture$Signaller) > Jan 24 01:38:36 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > Jan 24 01:38:36 at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > Jan 24 01:38:36 at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313) > Jan 24 01:38:36 at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > Jan 24 01:38:36 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > Jan 24 01:38:36 at > org.apache.flink.changelog.fs.ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest.java:251) > Jan 24 01:38:36 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34996][Connectors/Kafka] Use UserCodeCL to instantiate Deserializer [flink-connector-kafka]
hugogu commented on PR #89: URL: https://github.com/apache/flink-connector-kafka/pull/89#issuecomment-2049754379 @morazow Thanks for your approval. I have just rebased this PR to include the build fix made in main branch. Hopefully the build would success this time. I also noticed that v3.1 branch, may I know if this fix can be included? Shall I raise another PR for that? Not quite sure about the release procedure yet. cc @MartijnVisser -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34738][cdc][docs-zh] "Deployment - YARN" Page for Flink CDC Chinese Documentation [flink-cdc]
Vincent-Woo commented on PR #3205: URL: https://github.com/apache/flink-cdc/pull/3205#issuecomment-2049708639 @leonardBang @PatrickRen @lvyanquan @loserwang1024 Excuse me, do you have time to take a look at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]
flinkbot commented on PR #24653: URL: https://github.com/apache/flink/pull/24653#issuecomment-2049623349 ## CI report: * ecce541d1b05cd350ca635fc8e0f6611743d583d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32084) Migrate current file merging of channel state into the file merging framework
[ https://issues.apache.org/jira/browse/FLINK-32084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32084: --- Labels: pull-request-available (was: ) > Migrate current file merging of channel state into the file merging framework > - > > Key: FLINK-32084 > URL: https://issues.apache.org/jira/browse/FLINK-32084 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]
fredia opened a new pull request, #24653: URL: https://github.com/apache/flink/pull/24653 ## What is the purpose of the change Migrate current file merging of channel state snapshot into the unify file merging framework. This PR only focuses on the snapshot/writing part. ## Brief change log - Introduce `UnifyMergingChannelStateWriteRequestDispatcher` which construct a build a ` ChannelStateCheckpointWriter` that uses unified file merging mechanism to write channel state. - Update `ChannelStateWriteRequestExecutorFactory#getOrCreateExecutor` , different ways to merge small files can be chosen here. ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
[ https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836149#comment-17836149 ] Martijn Visser commented on FLINK-34127: [~mason6345] Is this something that you can check? I think I also see some flakyness here, like with https://github.com/apache/flink-connector-kafka/actions/runs/8646160813/job/23704885473 > Kafka connector repo runs a duplicate of `IntegrationTests` framework tests > --- > > Key: FLINK-34127 > URL: https://issues.apache.org/jira/browse/FLINK-34127 > Project: Flink > Issue Type: Improvement > Components: Build System / CI, Connectors / Kafka >Affects Versions: kafka-3.0.2 >Reporter: Mason Chen >Priority: Major > > I found out this behavior when troubleshooting CI flakiness. These > integration tests make heavy use of the CI since they require Kafka, > Zookeeper, and Docker containers. We can further stablize CI by not > redundantly running these set of tests. > `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` > returns: > ``` > 2024-01-17T00:51:05.2943150Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2], > Semantic: [EXACTLY_ONCE]] is running. > 2024-01-17T00:51:07.6922535Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2], > Semantic: [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:56:27.1326332Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a], > Semantic: [EXACTLY_ONCE]] is running. > 2024-01-17T00:56:28.4000830Z Test > org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: > [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a], > Semantic: [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:56:58.7830792Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:56:59.0544092Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:56:59.3910987Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:56:59.6025298Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:57:37.8378640Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:57:38.0144732Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T00:57:38.2004796Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T00:57:38.4072815Z Test > org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment: > [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: > [EXACTLY_ONCE]] successfully run. > 2024-01-17T01:06:11.2933375Z Test > org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment: > [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: > [EXACTLY_ONCE]] is running. > 2024-01-17T01:06:12.1790031Z Test > org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment: > [FlinkContainers], ExternalContext: [Ka
[jira] [Created] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check
elon_X created FLINK-35088: -- Summary: watermark alignment maxAllowedWatermarkDrift and updateInterval param need check Key: FLINK-35088 URL: https://issues.apache.org/jira/browse/FLINK-35088 Project: Flink Issue Type: Improvement Components: API / Core, Runtime / Coordination Affects Versions: 1.16.1 Reporter: elon_X Attachments: image-2024-04-11-20-12-29-951.png When I use watermark alignment, 1.I found that setting maxAllowedWatermarkDrift to a negative number initially led me to believe it could support delaying the consumption of the source, so I tried it. Then, the upstream data flow would hang indefinitely. Root cause: {code:java} long maxAllowedWatermark = globalCombinedWatermark.getTimestamp() + watermarkAlignmentParams.getMaxAllowedWatermarkDrift(); {code} If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark < lastEmittedWatermark, then the SourceReader will be blocked indefinitely and cannot recover. I'm not sure if this is a supported feature of watermark alignment. If it's not, I think an additional parameter validation should be implemented to throw an exception on the client side if the value is negative. 2.The updateInterval parameter also lacks validation. If I set it to 0, the task will throw an exception when starting the job manager. The JDK class java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and throws the exception. {code:java} java.lang.IllegalArgumentException: null at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) ~[?:1.8.0_351] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:208) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.
Re: [PR] [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController [flink]
Zakelly commented on code in PR #24633: URL: https://github.com/apache/flink/pull/24633#discussion_r1560916376 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A buffer to hold state requests to execute state requests in batch, which can only be manipulated + * within task thread. + * + * @param the type of the record + * @param the type of the key + */ +@NotThreadSafe +public class StateRequestBuffer { +/** + * The state requests in this buffer could be executed when the buffer is full or configured + * batch size is reached. All operations on this buffer must be invoked in task thread. + */ +final LinkedList> activeQueue; + +/** + * The requests in that should wait until all preceding records with identical key finishing its + * execution. After which the queueing requests will move into the active buffer. All operations + * on this buffer must be invoked in task thread. + */ +final Map>> blockingQueue; + +/** The number of state requests in blocking queue. */ +int blockingQueueSize; + +public StateRequestBuffer() { +this.activeQueue = new LinkedList<>(); +this.blockingQueue = new HashMap<>(); +this.blockingQueueSize = 0; +} + +void enqueueToActive(StateRequest request) { +activeQueue.add(request); +} + +void enqueueToBlocking(StateRequest request) { +blockingQueue +.computeIfAbsent(request.getRecordContext().getKey(), k -> new LinkedList<>()) +.add(request); +blockingQueueSize++; +} + +/** + * Try to pull one state request with specific key from blocking queue to active queue. + * + * @param key The key to release, the other records with this key is no longer blocking. + * @return The first record context with the same key in blocking queue, null if no such record. + */ +@Nullable +RecordContext tryActivateOneByKey(K key) { +if (!blockingQueue.containsKey(key)) { +return null; +} + +StateRequest stateRequest = blockingQueue.get(key).getFirst(); +activeQueue.add(stateRequest); +blockingQueue.get(key).removeFirst(); +if (blockingQueue.get(key).isEmpty()) { +blockingQueue.remove(key); +} +blockingQueueSize--; +return (RecordContext) stateRequest.getRecordContext(); +} + +/** + * Get the number of state requests of blocking queue in constant-time. + * + * @return the number of state requests of blocking queue. + */ +@VisibleForTesting +int blockingQueueSize() { +return blockingQueueSize; +} + +/** + * Get the number of state requests of active queue in constant-time. + * + * @return the number of state requests of active queue. + */ +@VisibleForTesting +int activeQueueSize() { +return activeQueue.size(); +} + +/** + * Try to pop N state requests from active queue, if the size of active queue is less than N, + * return all the requests in active queue. + * + * @param N the number of state requests to pop. + * @return A list of state requests. + */ +List> popActive(int N) { Review Comment: ```suggestion List> popActive(int n) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34661][runtime] TaskExecutor supports retain partitions after JM crashed. [flink]
zhuzhurk commented on code in PR #24582: URL: https://github.com/apache/flink/pull/24582#discussion_r1560848973 ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java: ## @@ -1886,6 +1911,33 @@ private void disconnectJobManagerConnection( } } +if (cleanupPartitionLater) { +// this branch is for job recovery +final Duration maxRegistrationDuration = +taskManagerConfiguration.getMaxRegistrationDuration(); + +if (maxRegistrationDuration != null) { +log.info( +"Waiting for {} mills for job {} to recover. If there is no reconnection, " Review Comment: there is no reconnection -> the job manager is not reconnected ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java: ## @@ -409,6 +410,12 @@ public TaskExecutor( resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log); } +private boolean isJobRecoveryEnabled() { Review Comment: Better to rename it to `shoudRetainPartitionsOnJobManagerConnectionLost`. And it should return `true` only if netty shuffle is used. ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java: ## @@ -2474,16 +2523,21 @@ public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMas "JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId); runAsync( -() -> -jobTable.getConnection(jobId) -.ifPresent( -jobManagerConnection -> - disconnectJobManagerConnection( - jobManagerConnection, -new Exception( -"Job leader for job id " -+ jobId -+ " lost leadership."; +() -> { +Optional connection = jobTable.getConnection(jobId); + +if (connection.isPresent()) { +Exception cause = +new Exception( +"Job leader for job id " + jobId + " lost leadership."); +if (isJobRecoveryEnabled()) { + disconnectJobManagerConnectionAndCleanupPartitionLater( Review Comment: Would you confirm if it's possible to reconnect to the old JM? if it can happen, the retained partitions will be leaked. ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java: ## @@ -1373,6 +1380,12 @@ private void disconnectAndTryReconnectToJobManager( } } +private void disconnectAndTryReconnectToJobManagerAndCleanupPartitionLater( +JobTable.Connection jobManagerConnection, Exception cause) { + disconnectJobManagerConnectionAndCleanupPartitionLater(jobManagerConnection, cause); Review Comment: I prefer to name it as `disconnectAndTryReconnectToJobManager` and add comments to explain that it does not cleanup partitions right now. Only if the reconnection cannot be done will the partitions get cleared. It is natural in the case to support JM reconnection so we do not need add all the details to the method name. And `CleanupPartitionLater` may not happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836142#comment-17836142 ] Zakelly Lan commented on FLINK-34704: - Thanks for your detailed explanation! Now I see your point. The problematic of my approach is that current record of AWOP will be lost. +1 for temporarily increase the size of the buffer by 1. IIUC even if the AWOP has an upstream, if current record could normally finish its process due to increment of buffer, the cp could proceed. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35038) Bump test dependency org.yaml:snakeyaml to 2.2
[ https://issues.apache.org/jira/browse/FLINK-35038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-35038. -- Fix Version/s: kafka-4.0.0 kafka-3.1.1 (was: 3.1.0) Resolution: Fixed Fixed in apache/flink-connector-kafka: main: 369e7be46a70fd50d68746498aed82105741e7d6 v3.1: ad798fc5387ba3582f92516697d60d0f523e86cb > Bump test dependency org.yaml:snakeyaml to 2.2 > --- > > Key: FLINK-35038 > URL: https://issues.apache.org/jira/browse/FLINK-35038 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Affects Versions: 3.1.0 >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi >Priority: Minor > Labels: pull-request-available > Fix For: kafka-4.0.0, kafka-3.1.1 > > > Usage of SnakeYAML via {{flink-shaded}} was replaced by an explicit test > scope dependency on {{org.yaml:snakeyaml:1.31}} with FLINK-34193. > This outdated version of SnakeYAML triggers security warnings. These should > not be an actual issue given the test scope, but we should consider bumping > the version for security hygiene purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35038) Bump test dependency org.yaml:snakeyaml to 2.2 for Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-35038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-35038: --- Summary: Bump test dependency org.yaml:snakeyaml to 2.2 for Flink Kafka connector (was: Bump test dependency org.yaml:snakeyaml to 2.2 ) > Bump test dependency org.yaml:snakeyaml to 2.2 for Flink Kafka connector > > > Key: FLINK-35038 > URL: https://issues.apache.org/jira/browse/FLINK-35038 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Affects Versions: 3.1.0 >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi >Priority: Minor > Labels: pull-request-available > Fix For: kafka-4.0.0, kafka-3.1.1 > > > Usage of SnakeYAML via {{flink-shaded}} was replaced by an explicit test > scope dependency on {{org.yaml:snakeyaml:1.31}} with FLINK-34193. > This outdated version of SnakeYAML triggers security warnings. These should > not be an actual issue given the test scope, but we should consider bumping > the version for security hygiene purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-35008. -- Fix Version/s: kafka-4.0.0 kafka-3.1.1 Resolution: Fixed Fixed in apache/flink-connector-kafka v3.1: 4168d0f22f2fb6b696b5e09d7b8d1f99a6714b78 main: 1c39e3b7495640c9b3784ec672097741c072cebb > Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink > Kafka connector > > > Key: FLINK-35008 > URL: https://issues.apache.org/jira/browse/FLINK-35008 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: kafka-4.0.0, kafka-3.1.1 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35038] Bump `org.yaml:snakeyaml` to `2.2` [flink-connector-kafka]
boring-cyborg[bot] commented on PR #93: URL: https://github.com/apache/flink-connector-kafka/pull/93#issuecomment-2049520227 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35038] Bump `org.yaml:snakeyaml` to `2.2` [flink-connector-kafka]
MartijnVisser merged PR #93: URL: https://github.com/apache/flink-connector-kafka/pull/93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35008: --- Labels: pull-request-available (was: ) > Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink > Kafka connector > > > Key: FLINK-35008 > URL: https://issues.apache.org/jira/browse/FLINK-35008 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35008] Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 [flink-connector-kafka]
MartijnVisser merged PR #87: URL: https://github.com/apache/flink-connector-kafka/pull/87 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35007) Update Flink Kafka connector to support 1.19
[ https://issues.apache.org/jira/browse/FLINK-35007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-35007. -- Fix Version/s: kafka-4.0.0 kafka-3.1.1 Resolution: Fixed Fixed in apache/flink-connector-kafka v3.1: 809cb0786565b3515d1a17319b0f98f59b1ef6c2 main: 897001d5682a0708042d59be81a10485ffd0dde7 > Update Flink Kafka connector to support 1.19 > > > Key: FLINK-35007 > URL: https://issues.apache.org/jira/browse/FLINK-35007 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: kafka-4.0.0, kafka-3.1.1 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35007][BP v3.1] Add support for Flink 1.19 (#90) [flink-connector-kafka]
MartijnVisser merged PR #94: URL: https://github.com/apache/flink-connector-kafka/pull/94 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-20217) More fine-grained timer processing
[ https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-20217: -- Assignee: Piotr Nowojski > More fine-grained timer processing > -- > > Key: FLINK-20217 > URL: https://issues.apache.org/jira/browse/FLINK-20217 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Runtime / Task >Affects Versions: 1.10.2, 1.11.2, 1.12.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > Timers are currently processed in one big block under the checkpoint lock > (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic > in a number of scenarios while doing checkpointing which would lead to > checkpoints timing out (and even unaligned checkpoints would not help). > If you have a huge number of timers to process when advancing the watermark > and the task is also back-pressured, the situation may actually be worse > since you would block on the checkpoint lock and also wait for > buffers/credits from the receiver. > I propose to make this loop more fine-grained so that it is interruptible by > checkpoints, but maybe there is also some other way to improve here. > This issue has been for example observed here: > https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34961) GitHub Actions runner statistcs can be monitored per workflow name
[ https://issues.apache.org/jira/browse/FLINK-34961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-34961: Fix Version/s: kafka-4.0.0 > GitHub Actions runner statistcs can be monitored per workflow name > -- > > Key: FLINK-34961 > URL: https://issues.apache.org/jira/browse/FLINK-34961 > Project: Flink > Issue Type: Improvement > Components: Build System / CI >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, starter > Fix For: kafka-4.0.0 > > > Apache Infra allows the monitoring of runner usage per workflow (see [report > for > Flink|https://infra-reports.apache.org/#ghactions&project=flink&hours=168&limit=10]; > only accessible with Apache committer rights). They accumulate the data by > workflow name. The Flink space has multiple repositories that use the generic > workflow name {{CI}}). That makes the differentiation in the report harder. > This Jira issue is about identifying all Flink-related projects with a CI > workflow (Kubernetes operator and the JDBC connector were identified, for > instance) and adding a more distinct name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34961) GitHub Actions runner statistcs can be monitored per workflow name
[ https://issues.apache.org/jira/browse/FLINK-34961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836128#comment-17836128 ] Sergey Nuyanzin commented on FLINK-34961: - Merged to flink-connector-kafka main as [c47abb3933b7c1e567a9142c6495038d16d42dd0|https://github.com/apache/flink-connector-kafka/commit/c47abb3933b7c1e567a9142c6495038d16d42dd0] > GitHub Actions runner statistcs can be monitored per workflow name > -- > > Key: FLINK-34961 > URL: https://issues.apache.org/jira/browse/FLINK-34961 > Project: Flink > Issue Type: Improvement > Components: Build System / CI >Reporter: Matthias Pohl >Priority: Major > Labels: pull-request-available, starter > > Apache Infra allows the monitoring of runner usage per workflow (see [report > for > Flink|https://infra-reports.apache.org/#ghactions&project=flink&hours=168&limit=10]; > only accessible with Apache committer rights). They accumulate the data by > workflow name. The Flink space has multiple repositories that use the generic > workflow name {{CI}}). That makes the differentiation in the report harder. > This Jira issue is about identifying all Flink-related projects with a CI > workflow (Kubernetes operator and the JDBC connector were identified, for > instance) and adding a more distinct name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34961] Use dedicated CI name for Kafka connector to differentiate it in infra-reports [flink-connector-kafka]
snuyanzin merged PR #92: URL: https://github.com/apache/flink-connector-kafka/pull/92 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836125#comment-17836125 ] Martijn Visser commented on FLINK-35008: I've updated to PR to use 1.26.1 > Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink > Kafka connector > > > Key: FLINK-35008 > URL: https://issues.apache.org/jira/browse/FLINK-35008 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink
[ https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35022: --- Labels: pull-request-available (was: ) > Add TypeInformed Element Converter for DynamoDbSink > --- > > Key: FLINK-35022 > URL: https://issues.apache.org/jira/browse/FLINK-35022 > Project: Flink > Issue Type: Improvement > Components: Connectors / DynamoDB >Affects Versions: aws-connector-4.3.0 >Reporter: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > > h2. Context > {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on > {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert > Flink stream objects to DynamoDb write requests, where item is represented as > {{Map}}. > {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a > format similar with type identification properties as in > {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}. > Since TypeInformation is already natively supported in Flink, many > implementations of the DynamoDb ElementConverted is just a boiler plate. > For example > {code:title="Simple POJO Element Conversion"} > public class Order { > String id; > int quantity; > double total; > } > {code} > The implementation of the converter must be > {code:title="Simple POJO DDB Element Converter"} > public static class SimplePojoElementConverter implements > ElementConverter { > @Override > public DynamoDbWriteRequest apply(Order order, SinkWriter.Context > context) { > Map itemMap = new HashMap<>(); > itemMap.put("id", AttributeValue.builder().s(order.id).build()); > itemMap.put("quantity", > AttributeValue.builder().n(String.valueOf(order.quantity)).build()); > itemMap.put("total", > AttributeValue.builder().n(String.valueOf(order.total)).build()); > return DynamoDbWriteRequest.builder() > .setType(DynamoDbWriteRequestType.PUT) > .setItem(itemMap) > .build(); > } > @Override > public void open(Sink.InitContext context) { > > } > } > {code} > while this might not be too much of work, however it is a fairly common case > in Flink and this implementation requires some fair knowledge of DDB model > for new users. > h2. Proposal > Introduce {{ DynamoDbTypeInformedElementConverter}} as follows: > {code:title="TypeInformedElementconverter"} > public class DynamoDbTypeInformedElementConverter implements > ElementConverter { > DynamoDbTypeInformedElementConverter(CompositeType typeInfo); > public DynamoDbWriteRequest convertElement(input) { > switch this.typeInfo{ > case: BasicTypeInfo.STRING_TYPE_INFO: return input -> > AttributeValue.fromS(o.toString()) > case: BasicTypeInfo.SHORT_TYPE_INFO: > case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> > AttributeValue.fromN(o.toString()) >case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input)) > . > } > } > } > // User Code > public static void main(String []args) { > DynamoDbTypeInformedElementConverter elementConverter = new > DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class)); > DdbSink.setElementConverter(elementConverter); > } > {code} > We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which > should be enough to cover all DDB supported types > (s,n,bool,b,ss,ns,bs,bools,m,l) > 1- > https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35022] Add TypeInformed DDB Element Converter [flink-connector-aws]
vahmed-hamdy opened a new pull request, #136: URL: https://github.com/apache/flink-connector-aws/pull/136 ## Purpose of the change Add `DynamoDbTypeInformedElementConverter` to convert Elements to dynamoDb Sink using its provided type Info. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-XXXXX] Bump org.yaml:snakeyaml from 1.31 to 2.0 in /flink-connector-kafka [flink-connector-kafka]
MartijnVisser commented on PR #85: URL: https://github.com/apache/flink-connector-kafka/pull/85#issuecomment-2049475263 Superseded by https://github.com/apache/flink-connector-kafka/pull/93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-XXXXX] Bump org.yaml:snakeyaml from 1.31 to 2.0 in /flink-connector-kafka [flink-connector-kafka]
MartijnVisser closed pull request #85: [FLINK-X] Bump org.yaml:snakeyaml from 1.31 to 2.0 in /flink-connector-kafka URL: https://github.com/apache/flink-connector-kafka/pull/85 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-29050) [JUnit5 Migration] Module: flink-hadoop-compatibility
[ https://issues.apache.org/jira/browse/FLINK-29050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798856#comment-17798856 ] RocMarshal edited comment on FLINK-29050 at 4/11/24 10:56 AM: -- Based on [https://github.com/apache/flink/pull/20990#discussion_r1292783464] We'd like to do the following sub-tasks for the current jira. - Rename AbstractTestBase, JavaProgramTestBase MultipleProgramsTestBase to JUnit4, - Use jUnit5 to re-write the implementations for the above classes & tag JUnit4 classes as deprecated - Use junit5 implementation classes to migrate the Module: flink-hadoop-compatibility - Use junit5 implementation to make adaption for the sub-classes of JUnit4 (Maybe this part of the work needs to be recorded and promoted in other jiras) was (Author: rocmarshal): Based on [https://github.com/apache/flink/pull/20990#discussion_r1292783464] We'd like to do the following sub-tasks for the current jira. - Rename AbstractTestBase, JavaProgramTestBase MultipleProgramsTestBase to JUnit4, - Use jUnit5 to re-write the implementations for the above classes & tag JUnit4 classes as deprecated - Use junit5 implementation classes to migrate the Module: flink-hadoop-compatibility - Use junit5 implementation to make adaption for the sub-classes of JUnit4 > [JUnit5 Migration] Module: flink-hadoop-compatibility > - > > Key: FLINK-29050 > URL: https://issues.apache.org/jira/browse/FLINK-29050 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hadoop Compatibility, Tests >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available, stale-assigned, starter > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836113#comment-17836113 ] Piotr Nowojski commented on FLINK-34704: Sounds good to me (y) Indeed we could later provide some kind of overdraft buffer capacity to be used just for checkpointing. I think that this might relate to the things I want to propose in FLIP-443 as it will give the AWOP some way of knowing that it should use the overdraft buffer. Let's discuss this later and keep the ticket open :) > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]
yuxiqian commented on code in PR #3221: URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560819276 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java: ## @@ -108,7 +110,32 @@ public void execute(Context context) throws Exception { this.taskRunning = true; try { while (taskRunning) { -Optional next = Optional.ofNullable(changeStreamCursor.tryNext()); +Optional next; +try { +next = Optional.ofNullable(changeStreamCursor.tryNext()); +} catch (MongoQueryException e) { +if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) { +ChangeStreamOffset offset = +new ChangeStreamOffset(streamSplit.getStartingOffset().getOffset()); Review Comment: @Jiabao-Sun You're right, I've added resume token expiration checks in creating and iterating process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35045][state] Introduce Internal State for Async State API [flink]
Zakelly commented on code in PR #24651: URL: https://github.com/apache/flink/pull/24651#discussion_r1560803119 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.internal; Review Comment: I'm wondering if package name `org.apache.flink.runtime.state.v2` is enough. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +/** + * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the + * {@link State} being the root of the public API state hierarchy. + * + * The public API state hierarchy is intended to be programmed against by Flink applications. The + * internal state hierarchy holds all the auxiliary methods that communicates with {@link + * AsyncExecutionController} and not intended to be used by user applications. + * + * @param The type of key the state is associated to. + * @param The type of values kept internally in state. + */ +@Internal +public abstract class InternalKvState implements State { Review Comment: Maybe a base class for all state named `InternalKeyedState` is better? Not for KVState, and we'd better break the hierarchy among internal map state and value state. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/ValueStateImpl.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.internal; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +/** + * A default implementation of {@link ValueState} which delegates all async requests to {@link + * AsyncExecutionController}. + * + * @param The type of key the state is associated to. + * @param The type of values kept internally in state. + */ +public class ValueStateImpl extends InternalKvState implements ValueState { + +public ValueStateImpl( +Async
[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836106#comment-17836106 ] Gyula Fora edited comment on FLINK-34704 at 4/11/24 10:41 AM: -- I agree with [~pnowojski] here, the currently blocked element would be lost in the checkpoint. But [~Zakelly] also has a valid point. I have played around with this and there is a simple optimisation to be made for the async operator though under certain circumstances. If the AWOP is the head of the operator chain (no upstream), we could actually checkpoint during yielding but we would also need to checkpoint the current processed element as part of the buffer (temporarily increase the size of the buffer by 1). This is still related to the other ticket in the sense that we need to get the checkpoint trigger during yield but it needs a custom logic for the AWOP to allow checkpointing while being blocked on the full buffer was (Author: gyfora): I agree with [~pnowojski] here, the currently blocked element would be lost in the checkpoint. I have played around with this and there is a simple optimisation to be made for the async operator though under certain circumstances. If the AWOP is the head of the operator chain (no upstream), we could actually checkpoint during yielding but we would also need to checkpoint the current processed element as part of the buffer (temporarily increase the size of the buffer by 1). This is still related to the other ticket in the sense that we need to get the checkpoint trigger during yield but it needs a custom logic for the AWOP to allow checkpointing while being blocked on the full buffer > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836107#comment-17836107 ] Gyula Fora commented on FLINK-34704: So restricting the optimisation to the head of the operator chain is somewhat restricting but still the improvement in this particular scenario is actually huge and this may make or break some specialised use-cases so probably still worth considering after FLINK-35051 > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836106#comment-17836106 ] Gyula Fora commented on FLINK-34704: I agree with [~pnowojski] here, the currently blocked element would be lost in the checkpoint. I have played around with this and there is a simple optimisation to be made for the async operator though under certain circumstances. If the AWOP is the head of the operator chain (no upstream), we could actually checkpoint during yielding but we would also need to checkpoint the current processed element as part of the buffer (temporarily increase the size of the buffer by 1). This is still related to the other ticket in the sense that we need to get the checkpoint trigger during yield but it needs a custom logic for the AWOP to allow checkpointing while being blocked on the full buffer > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]
Jiabao-Sun commented on code in PR #3221: URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560798123 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java: ## @@ -108,7 +110,32 @@ public void execute(Context context) throws Exception { this.taskRunning = true; try { while (taskRunning) { -Optional next = Optional.ofNullable(changeStreamCursor.tryNext()); +Optional next; +try { +next = Optional.ofNullable(changeStreamCursor.tryNext()); +} catch (MongoQueryException e) { +if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) { +ChangeStreamOffset offset = +new ChangeStreamOffset(streamSplit.getStartingOffset().getOffset()); Review Comment: Hi @yuxiqian, the `MongoCommandException` seems can be thrown by openChangeStreamCursor. ``` at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:121) at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:237) ``` ![image](https://github.com/apache/flink-cdc/assets/27403841/829b5668-1cbf-4bfe-a324-a05c53e8d2eb) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]
slfan1989 commented on PR #24652: URL: https://github.com/apache/flink/pull/24652#issuecomment-2049386534 > Thanks @slfan1989 for the quick review! > > > @1996fanrui If we confirm that it's an issue with commons-io, couldn't we resolve it by upgrading commons-io instead? > > Do you mean upgrading commons-io to the latest version? If yes, I tried upgrading `commons-io` to `2.16.1`, it doesn't work. > > I saw `2.16.1` is the latest version of `commons-io`[1]. > > [1] https://mvnrepository.com/artifact/commons-io/commons-io I'm sorry for any inconvenience caused. The reason for upgrading commons-io is that common-compress requires a higher version. We encounter a "class not found" issue when using commons-io 2.11. If we can't meet the performance requirements, I think we might have to revert FLINK-34955. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
[ https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836099#comment-17836099 ] Shilun Fan commented on FLINK-35040: [~fanrui] During the compilation process, we found that commons-compress requires a higher version of commons-io, otherwise there will be a class not found exception. > The performance of serializerHeavyString regresses since April 3 > > > Key: FLINK-35040 > URL: https://issues.apache.org/jira/browse/FLINK-35040 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Attachments: image-2024-04-08-10-51-07-403.png, > image-2024-04-11-12-53-53-353.png, screenshot-1.png > > > The performance of serializerHeavyString regresses since April 3, and had not > yet recovered on April 8th. > It seems Java 11 regresses, and Java 8 and Java 17 are fine. > http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerHeavyString&extr=on&quarts=on&equid=off&env=3&revs=200 > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]
1996fanrui commented on PR #24652: URL: https://github.com/apache/flink/pull/24652#issuecomment-2049377985 Thanks @slfan1989 for the quick review! > @1996fanrui If we confirm that it's an issue with commons-io, couldn't we resolve it by upgrading commons-io instead? Do you mean upgrading commons-io to the latest version? If yes, I tried upgrading `commons-io` to `2.16.1`, it doesn't work. I saw `2.16.1` is the latest version of `commons-io`[1]. [1] https://mvnrepository.com/artifact/commons-io/commons-io -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]
yuxiqian commented on code in PR #3221: URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560784586 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java: ## @@ -108,7 +110,32 @@ public void execute(Context context) throws Exception { this.taskRunning = true; try { while (taskRunning) { -Optional next = Optional.ofNullable(changeStreamCursor.tryNext()); +Optional next; +try { +next = Optional.ofNullable(changeStreamCursor.tryNext()); +} catch (MongoQueryException e) { +if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) { +ChangeStreamOffset offset = +new ChangeStreamOffset(streamSplit.getStartingOffset().getOffset()); Review Comment: Yes, it would be much cleaner to move that checking logic to `openChangeStreamCursor` and implement the fallback logic along with existing routes. CMIIW but seems `MongoCommandException` will only be thrown [when getNext or tryGetNext is called](https://github.com/mongodb/mongo/blob/a71feef352a1f08e3f9dbc2b840691e75775a370/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp#L133C31-L133C83), and we can't intentionally trigger an `MongoCommandException` by calling other methods like `hasNext` earlier. @Jiabao-Sun Any suggestions about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]
yuxiqian commented on code in PR #3221: URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560784586 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java: ## @@ -108,7 +110,32 @@ public void execute(Context context) throws Exception { this.taskRunning = true; try { while (taskRunning) { -Optional next = Optional.ofNullable(changeStreamCursor.tryNext()); +Optional next; +try { +next = Optional.ofNullable(changeStreamCursor.tryNext()); +} catch (MongoQueryException e) { +if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) { +ChangeStreamOffset offset = +new ChangeStreamOffset(streamSplit.getStartingOffset().getOffset()); Review Comment: Yes, it would be much cleaner to move that checking logic to `openChangeStreamCursor` and implement the fallback logic along with existing routes. CMIIW but seems `MongoCommandException` will only be thrown [when getNext or tryGetNext is called](https://github.com/mongodb/mongo/blob/a71feef352a1f08e3f9dbc2b840691e75775a370/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp#L133C31-L133C83), and we can't intentionally trigger an `MongoCommandException` by calling other methods like `hasNext` earlier. @Jiabao-Sun Any suggestions about this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
[ https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836091#comment-17836091 ] Rui Fan edited comment on FLINK-35040 at 4/11/24 10:16 AM: --- Hi [~slfan1989] , thanks for your quick feedback! FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the performance is recovered. My question is why do you upgrade the commons-io in FLINK-34955, and I didn't see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0? Note: I revert {{commons-io}} to 2.11.0, and upgrade commons-compress to 2.16.1, then run the benchmark once, the performance is recovered. I try to only revert {{commons-io}} to 2.11.0(See the PR), trigger benchmark twice, and see the performance result later. (The benchmark server is busy, so the result may be finished tomorrow.) was (Author: fanrui): Hi [~slfan1989] , thanks for your quick feedback! FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the performance is recovered. My question is why do you upgrade the commons-io in FLINK-34955, and I didn't see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0? Note: I revert {{commons-io}} to 2.11.0, and upgrade commons-compress to 2.16.1, then run the benchmark once, the performance is recovered. I try to only revert {{commons-io}} to 2.11.0, trigger benchmark twice, and see the performance result later. (The benchmark server is busy, so the result may be finished tomorrow.) > The performance of serializerHeavyString regresses since April 3 > > > Key: FLINK-35040 > URL: https://issues.apache.org/jira/browse/FLINK-35040 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Attachments: image-2024-04-08-10-51-07-403.png, > image-2024-04-11-12-53-53-353.png, screenshot-1.png > > > The performance of serializerHeavyString regresses since April 3, and had not > yet recovered on April 8th. > It seems Java 11 regresses, and Java 8 and Java 17 are fine. > http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerHeavyString&extr=on&quarts=on&equid=off&env=3&revs=200 > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]
slfan1989 commented on PR #24652: URL: https://github.com/apache/flink/pull/24652#issuecomment-2049370922 @1996fanrui If we confirm that it's an issue with commons-io, couldn't we resolve it by upgrading commons-io instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
[ https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836091#comment-17836091 ] Rui Fan edited comment on FLINK-35040 at 4/11/24 10:15 AM: --- Hi [~slfan1989] , thanks for your quick feedback! FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the performance is recovered. My question is why do you upgrade the commons-io in FLINK-34955, and I didn't see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0? Note: I revert {{commons-io}} to 2.11.0, and upgrade commons-compress to 2.16.1, then run the benchmark once, the performance is recovered. I try to only revert {{commons-io}} to 2.11.0, trigger benchmark twice, and see the performance result later. (The benchmark server is busy, so the result may be finished tomorrow.) was (Author: fanrui): Hi [~slfan1989] , thanks for your quick feedback! FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the performance is recovered. My question is why do you upgrade the commons-io in FLINK-34955, and I didn't see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0? > The performance of serializerHeavyString regresses since April 3 > > > Key: FLINK-35040 > URL: https://issues.apache.org/jira/browse/FLINK-35040 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Attachments: image-2024-04-08-10-51-07-403.png, > image-2024-04-11-12-53-53-353.png, screenshot-1.png > > > The performance of serializerHeavyString regresses since April 3, and had not > yet recovered on April 8th. > It seems Java 11 regresses, and Java 8 and Java 17 are fine. > http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerHeavyString&extr=on&quarts=on&equid=off&env=3&revs=200 > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)