[PR] [FLINK-34961] Use dedicated CI name for Kubernetes Operator to differentiate it in infra-reports [flink-kubernetes-operator]

2024-04-11 Thread via GitHub


snuyanzin opened a new pull request, #813:
URL: https://github.com/apache/flink-kubernetes-operator/pull/813

   ## What is the purpose of the change
   
   The problem with current GHA ci is that it has `CI` name which is the same 
across multiple Flink projects and Apache INFRA doesn't differentiate it in 
it's  GHA usage report https://infra-reports.apache.org/#ghactions . The idea 
is to use different names to cope with this
   
   ## Brief change log
   
   changed name for gha ci
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
( no)
 - Core observer or reconciler logic that is regularly executed: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35045][state] Introduce Internal State for Async State API [flink]

2024-04-11 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1562071643


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKvState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKvState implements State {
+
+private final AsyncExecutionController asyncExecutionController;
+
+private final StateDescriptor stateDescriptor;
+
+/**
+ * Creates a new InternalKvState with the given asyncExecutionController 
and stateDescriptor.
+ */
+public InternalKvState(
+AsyncExecutionController asyncExecutionController,
+StateDescriptor stateDescriptor) {
+this.asyncExecutionController = asyncExecutionController;
+this.stateDescriptor = stateDescriptor;
+}
+
+/**
+ * Submit a state request to AEC.
+ *
+ * @param stateRequestType the type of this request.
+ * @param payload the payload input for this request.
+ * @return the state future.
+ */
+protected  StateFuture handleRequest(
+StateRequestType stateRequestType, IN payload) {
+return asyncExecutionController.handleRequest(this, stateRequestType, 
payload);
+}
+
+/** Return specific {@code StateDescriptor}. */
+public StateDescriptor getStateDescriptor() {
+return stateDescriptor;
+}
+}

Review Comment:
   I have added a method of value serializer and related test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]

2024-04-11 Thread via GitHub


Jiabao-Sun commented on code in PR #3221:
URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560833766


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java:
##
@@ -108,7 +110,19 @@ public void execute(Context context) throws Exception {
 this.taskRunning = true;
 try {
 while (taskRunning) {
-Optional next = 
Optional.ofNullable(changeStreamCursor.tryNext());
+Optional next;
+try {
+next = Optional.ofNullable(changeStreamCursor.tryNext());
+} catch (MongoCommandException e) {
+if (MongoUtils.checkIfResumeTokenExpires(e)) {
+resumeTokenExpired = true;

Review Comment:
   Do we need to reset its value?



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java:
##
@@ -236,7 +250,7 @@ private MongoChangeStreamCursor 
openChangeStreamCursor(
 BsonDocument resumeToken = offset.getResumeToken();
 BsonTimestamp timestamp = offset.getTimestamp();
 
-if (resumeToken != null) {
+if (resumeToken != null && !resumeTokenExpired) {

Review Comment:
   It should be noted that `startAtOperationTime` is supported starting from 
MongoDB version 4.0. 
   
   The prerequisite for executing the following code block is that the 
`resumeToken` does not exist, which occurs in two scenarios: 
   1. a specified timestamp is used as the starting offset.
   2. prior to version 4.0.7, when a collection has not been updated for a long 
time and the `postResumeToken` cannot be obtained, the current timestamp is 
used as the starting offset.
   
   In the `else` branch of the code below, if we cannot start the Change Stream 
using `startAtOperationTime ` , we fallback to starting the Change Stream from 
the latest offset. 
   Before MongoDB 4.0, we may lose data during the snapshot changes, but the 
likelihood of losing data during the snapshot is relatively low: if a 
collection is highly likely to change, we can obtain the `resumeToken` at its 
starting position; if a collection has not changed for a long time and we 
cannot obtain the `resumeToken` and `postResumeToken`, the likelihood of data 
loss during the snapshot is very low.
   
   However, when we encounter an invalid `resumeToken` during runtime or when 
recovering from a checkpoint, there is a high possibility of data loss. I have 
reconsidered this issue, and if we cannot recover the change stream from the 
`resumeToken`, it is highly likely that we cannot recover it through 
`startAtOperationTime` either, as they correspond to the same position in the 
`oplog`.
   
   In `mongo-kafka`, there is an explicit configuration called 
"tolerant-errors" provided to handle interruptions in the change stream. But in 
some scenarios, consistency requirements may outweigh availability, we should 
throw an exception to let user re-run the task from beginning.
   
   ```java
  else {
   if (supportsStartAtOperationTime) {
   LOG.info("Open the change stream at the timestamp: {}", 
timestamp);
   changeStreamIterable.startAtOperationTime(timestamp);
   } else {
   LOG.warn("Open the change stream of the latest offset");
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35045][state] Introduce Internal State for Async State API [flink]

2024-04-11 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1562060617


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.internal;

Review Comment:
   I think it's enough currently. Just modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-11 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836395#comment-17836395
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/12/24 6:08 AM:
-

[~echauchot] 

Thank you for your reply.
I think you are looking at this scene from the perspective of Reactive Mode, 
because Reactive Mode only uses the resources of the cluster as a criterion for 
task parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. I just use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max or 
min-parallelism-increase


was (Author: JIRAUSER296932):
[~echauchot] 

Thank you for your reply.

You should look at this issue from the perspective of Reactive Mode, because 
Reactive Mode only uses the resources of the cluster as a criterion for task 
parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. But I use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-04-11 Thread via GitHub


JustinLeesin opened a new pull request, #24656:
URL: https://github.com/apache/flink/pull/24656

   1.19 backport for parent PR #24397 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-04-11 Thread via GitHub


flinkbot commented on PR #24656:
URL: https://github.com/apache/flink/pull/24656#issuecomment-2051047110

   
   ## CI report:
   
   * cf2c203cc40e921dd455cb3ec4bab0048400f029 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]

2024-04-11 Thread via GitHub


fredia commented on code in PR #22973:
URL: https://github.com/apache/flink/pull/22973#discussion_r1562003338


##
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##
@@ -44,6 +44,42 @@
 String
 The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.
 
+
+
state.checkpoints.file-merging.across-checkpoint-boundary
+false
+Boolean
+Only relevant if state.checkpoints.file-merging.enabled is 
enabled.Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.
+
+
+state.checkpoints.file-merging.enabled

Review Comment:
   The `position` can only determine the order of `xxx_secttion.html`.  
`checkpointing_configuration.html` is generated by 
`ConfigOptionsDocGenerator#generateTablesForClass`, it used `DocumentedKey` as 
the [comparing 
key](https://github.com/apache/flink/blob/master/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java#L547).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]

2024-04-11 Thread via GitHub


Zakelly commented on code in PR #22973:
URL: https://github.com/apache/flink/pull/22973#discussion_r1561983143


##
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##
@@ -44,6 +44,42 @@
 String
 The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.
 
+
+
state.checkpoints.file-merging.across-checkpoint-boundary
+false
+Boolean
+Only relevant if state.checkpoints.file-merging.enabled is 
enabled.Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.
+
+
+state.checkpoints.file-merging.enabled

Review Comment:
   I found the annotation `@Documentation.Section` can specify `position`, may 
be helpful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35089][runtime] Serialize the lastRecordAttributes in AbstractStreamOperator [flink]

2024-04-11 Thread via GitHub


Sxnan commented on PR #24655:
URL: https://github.com/apache/flink/pull/24655#issuecomment-2050918953

   @xintongsong Can you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35089][runtime] Serialize the lastRecordAttributes in AbstractStreamOperator [flink]

2024-04-11 Thread via GitHub


flinkbot commented on PR #24655:
URL: https://github.com/apache/flink/pull/24655#issuecomment-205094

   
   ## CI report:
   
   * b8758babe99b2fc7f080026b3615e2b5ff7da164 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes

2024-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35089:
---
Labels: pull-request-available  (was: )

> Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
> --
>
> Key: FLINK-35089
> URL: https://issues.apache.org/jira/browse/FLINK-35089
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
> `AbstractStreamOperator` are transient. The two fields will be null when it 
> is deserialized in TaskManager, which may cause an NPE.
> To fix it, we propose to make the RecordAttributes serializable and these 
> fields non-transient.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35089][runtime] Serialize the lastRecordAttributes in AbstractStreamOperator [flink]

2024-04-11 Thread via GitHub


Sxnan opened a new pull request, #24655:
URL: https://github.com/apache/flink/pull/24655

   ## What is the purpose of the change
   
   Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.
   
   To fix it, we make the RecordAttributes serializable and these fields 
non-transient.
   
   ## Brief change log
   
   - Make the RecordAttributes serializable
   - Make `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` non-transient
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Integration test is added 
`org.apache.flink.test.streaming.runtime.RecordAttributesPropagationITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializer: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes

2024-04-11 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35089:
---
Description: 
Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serializable and these 
fields non-transient.

 

  was:
Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serialization and these 
fields non-transient.

 


> Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
> --
>
> Key: FLINK-35089
> URL: https://issues.apache.org/jira/browse/FLINK-35089
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: Xuannan Su
>Priority: Major
>
> Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
> `AbstractStreamOperator` are transient. The two fields will be null when it 
> is deserialized in TaskManager, which may cause an NPE.
> To fix it, we propose to make the RecordAttributes serializable and these 
> fields non-transient.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.

2024-04-11 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836403#comment-17836403
 ] 

Hongshun Wang commented on FLINK-35093:
---

I'd like to do it

> Postgres source connector support SPECIFIC_OFFSETS start up mode from an 
> existed replication slot.
> --
>
> Key: FLINK-35093
> URL: https://issues.apache.org/jira/browse/FLINK-35093
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
>
> Current, Postgres source connector  only support INITIAL and LATEST mode.
> However, sometimes, user want to restart from an existed replication slot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.

2024-04-11 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35093:
-

 Summary: Postgres source connector support SPECIFIC_OFFSETS start 
up mode from an existed replication slot.
 Key: FLINK-35093
 URL: https://issues.apache.org/jira/browse/FLINK-35093
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang


Current, Postgres source connector  only support INITIAL and LATEST mode.

However, sometimes, user want to restart from an existed replication slot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35090) Doris sink fails to create table when database does not exist

2024-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35090:
---
Labels: pull-request-available  (was: )

> Doris sink fails to create table when database does not exist
> -
>
> Key: FLINK-35090
> URL: https://issues.apache.org/jira/browse/FLINK-35090
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Xiqian YU
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, Doris sink connector doesn't support creating database 
> automatically. When user specifies a sink namespace with non-existing 
> database in YAML config, Doris connector will crash.
> Expected behaviour: Doris sink connector should create both database and 
> table automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35090][cdc][doris] Add database auto-creating support for Doris sink pipeline connector [flink-cdc]

2024-04-11 Thread via GitHub


yuxiqian commented on PR #3222:
URL: https://github.com/apache/flink-cdc/pull/3222#issuecomment-2050900067

   @lvyanquan PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34903][MySQL][Feature] Add mysql-pipeline-connector with table.exclude.list option to exclude unnecessary tables [flink-cdc]

2024-04-11 Thread via GitHub


shiyiky commented on code in PR #3186:
URL: https://github.com/apache/flink-cdc/pull/3186#discussion_r1561958443


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java:
##
@@ -79,6 +80,49 @@ public void testNoMatchedTable() {
 .hasMessageContaining("Cannot find any table by the option 
'tables' = " + tables);
 }
 
+@Test
+public void testExcludeTable() {
+inventoryDatabase.createAndInitialize();
+Map options = new HashMap<>();
+options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+options.put(USERNAME.key(), TEST_USER);
+options.put(PASSWORD.key(), TEST_PASSWORD);
+// db has three tables , table.list= (products,orders shipments)
+options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".prod\\.*");
+String tableExcludeList = inventoryDatabase.getDatabaseName() + 
".prod\\.orders";

Review Comment:
   u are right and  I will adapter it,tks。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]

2024-04-11 Thread via GitHub


fredia commented on code in PR #22973:
URL: https://github.com/apache/flink/pull/22973#discussion_r1561957606


##
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##
@@ -44,6 +44,42 @@
 String
 The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.
 
+
+
state.checkpoints.file-merging.across-checkpoint-boundary
+false
+Boolean
+Only relevant if state.checkpoints.file-merging.enabled is 
enabled.Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.
+
+
+state.checkpoints.file-merging.enabled
+false
+Boolean
+Whether to enable merging multiple checkpoint files into one, 
which will greatly reduce the number of small checkpoint files.
+
+
+state.checkpoints.file-merging.max-file-size
+32 mb
+MemorySize
+Max size of a physical file for merged checkpoints.
+
+
+
state.checkpoints.file-merging.max-space-amplification
+0.75

Review Comment:
   👍 I took it as the proportion of invalid data, the old description is "A 
threshold that triggers a compaction (re-uploading) of one physical file. If 
the amount of invalid data in a physical file exceeds the threshold, a new 
physical file will be created and uploaded."
   
   Changed it to 2 and modify the description.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-11 Thread via GitHub


PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1561954297


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/MemorySize.java:
##
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different 
units.
+ *
+ * Parsing
+ *
+ * The size can be parsed from a text expression. If the expression is a 
pure number, the value
+ * will be interpreted as bytes.
+ */
+@PublicEvolving
+public class MemorySize implements java.io.Serializable, 
Comparable {

Review Comment:
   I think for `MemorySize` we should use the one the Flink instead of creating 
our own. It is a public API (marked as `@PublicEvolving`), and Flink CDC 
doesn't use this one in our production code. IIUC it is just used for parsing 
memory size expressions in configuration. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]

2024-04-11 Thread via GitHub


fredia commented on code in PR #22973:
URL: https://github.com/apache/flink/pull/22973#discussion_r1561952908


##
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##
@@ -44,6 +44,42 @@
 String
 The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.
 
+
+
state.checkpoints.file-merging.across-checkpoint-boundary
+false
+Boolean
+Only relevant if state.checkpoints.file-merging.enabled is 
enabled.Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.
+
+
+state.checkpoints.file-merging.enabled

Review Comment:
   I'm afraid not, it's in alphabetical order. 
   For example, the configuration options related to `speculative ` are also 
like this.
   
![image](https://github.com/apache/flink/assets/18653940/117a8695-3733-4454-8f58-70d3ea0c9f9e)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-11 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836395#comment-17836395
 ] 

yuanfenghu commented on FLINK-35035:


[~echauchot] 

Thank you for your reply.

You should look at this issue from the perspective of Reactive Mode, because 
Reactive Mode only uses the resources of the cluster as a criterion for task 
parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. But I use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-11 Thread via GitHub


morazow commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1561946899


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.RowData;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Generator implements Iterator, Iterable {
+final int numKeys;
+
+private int keyIndex = 0;
+
+private final long durationMs;
+private final long stepMs;
+private final long offsetMs;
+private long ms = 0;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+return new Generator(numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+}
+
+Generator(int numKeys, long durationMs, long stepMs, long offsetMs) {
+this.numKeys = numKeys;
+this.durationMs = durationMs;
+this.stepMs = stepMs;
+this.offsetMs = offsetMs;
+}
+
+@Override
+public boolean hasNext() {
+return ms < durationMs;
+}
+
+@Override
+public RowData next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+RowData row =
+new GeneratedRow(
+keyIndex,
+LocalDateTime.ofInstant(
+Instant.ofEpochMilli(ms + offsetMs), 
ZoneOffset.UTC),
+"Some payload...");

Review Comment:
   I saw this was in the original change, but should we randomize the payload?



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, 
@TempDir Path tmpDir) thr
 jobClient.getJobExecutionResult().get();
 
 final String expected =
-"1980,1970-01-01 00:00:00.0\n"
-+ "1980,1970-01-01 00:00:20.0\n"
-+ "1980,1970-01-01 00:00:40.0\n";
+"1980,1970-01-01 00:00:00\n"

Review Comment:
   Why is this change is required?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35091][Metrics][Minor] Fix incorrect warning msg in JM log when use metric reporter [flink]

2024-04-11 Thread via GitHub


flinkbot commented on PR #24654:
URL: https://github.com/apache/flink/pull/24654#issuecomment-2050853597

   
   ## CI report:
   
   * 682a59db0c1587c46c9fe72fd0dd8e1283e51e8a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35092) Add integrated test for Doris sink pipeline connector

2024-04-11 Thread Xiqian YU (Jira)
Xiqian YU created FLINK-35092:
-

 Summary: Add integrated test for Doris sink pipeline connector
 Key: FLINK-35092
 URL: https://issues.apache.org/jira/browse/FLINK-35092
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Xiqian YU


Currently, no integrated test are being applied to Doris pipeline connector 
(there's only one DorisRowConverterTest case for now). Adding ITcases would 
improving Doris connector's code quality and reliability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34853] Draft: Submit CDC Job To Flink K8S Native Application Mode [flink-cdc]

2024-04-11 Thread via GitHub


PatrickRen commented on PR #3093:
URL: https://github.com/apache/flink-cdc/pull/3093#issuecomment-2050850214

   @czy006 Could you rebase the PR to the latest master? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35091][Metrics][Minor] Fix incorrect warning msg in JM log when use metric reporter [flink]

2024-04-11 Thread via GitHub


leosanqing opened a new pull request, #24654:
URL: https://github.com/apache/flink/pull/24654

   ## Desc
   I encountered an issue while upgrading Flink from version 1.14 to 1.18. 
After the upgrade, I noticed that some monitoring metrics were not being 
reported to InfluxDB.
   Upon checking the Job Manager (JM) logs, I found an error indicating that 
the previously used classes are no longer supported. However, there seems to be 
an oddly phrased error message that looks like it might have been written 
incorrectly.
   
   The error message reads: "The reporter configuration of '{}' configures the 
reporter class, which is no a no longer supported approach to configure 
reporters." + " Please configure a factory class instead:"
   
   I believe the correct phrasing should be: "The reporter configuration of 
'{}' configures the reporter class, which is a no longer supported approach to 
configure reporters." + " Please configure a factory class instead:"
   
   It appears that the words "no a" were accidentally added, making the 
sentence grammatically incorrect and potentially confusing for users.
   
   ## What is the purpose of the change
   
   Fix incorrect warning msg in JM log when use metric reporter
   
   ## Brief change log
   
   Just warning log print
   
   The error message reads: "The reporter configuration of '{}' configures the 
reporter class, which is no a no longer supported approach to configure 
reporters." + " Please configure a factory class instead:"
   I believe the correct phrasing should be: "The reporter configuration of 
'{}' configures the reporter class, which is a no longer supported approach to 
configure reporters." + " Please configure a factory class instead:"
   
   
![image](https://github.com/apache/flink/assets/20400582/14b6c38d-933e-4517-a2f8-386967c8e553)
   
   
   ## Verifying this change
   
   No need to test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35091) Incorrect warning msg in JM when use metric reporter

2024-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35091:
---
Labels: pull-request-available  (was: )

> Incorrect warning msg in JM when use metric reporter
> 
>
> Key: FLINK-35091
> URL: https://issues.apache.org/jira/browse/FLINK-35091
> Project: Flink
>  Issue Type: Improvement
>Reporter: sanqingleo
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.3
>
> Attachments: image-2024-04-12-10-02-20-142.png
>
>
> Hello,
> I encountered an issue while upgrading Flink from version 1.14 to 1.18. After 
> the upgrade, I noticed that some monitoring metrics were not being reported 
> to InfluxDB.
> Upon checking the Job Manager (JM) logs, I found an error indicating that the 
> previously used classes are no longer supported. However, there seems to be 
> an oddly phrased error message that looks like it might have been written 
> incorrectly.
> The error message reads: "The reporter configuration of '{}' configures the 
> reporter class, which is no a no longer supported approach to configure 
> reporters." + " Please configure a factory class instead:"
> I believe the correct phrasing should be: "The reporter configuration of '{}' 
> configures the reporter class, which is a no longer supported approach to 
> configure reporters." + " Please configure a factory class instead:"
> It appears that the words "no a" were accidentally added, making the sentence 
> grammatically incorrect and potentially confusing for users.
>  
> !image-2024-04-12-10-02-20-142.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController [flink]

2024-04-11 Thread via GitHub


fredia commented on PR #24633:
URL: https://github.com/apache/flink/pull/24633#issuecomment-2050847516

   @Zakelly Thanks for the review, updated and squashed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

2024-04-11 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836393#comment-17836393
 ] 

Qingsheng Ren commented on FLINK-34634:
---

flink-cdc master: 48ca8623bb8fa405adb56dbe505dbad10902db89

> Restarting the job will not read the changelog anymore if it stops before the 
> synchronization of meta information is complete and some table is removed
> ---
>
> Key: FLINK-34634
> URL: https://issues.apache.org/jira/browse/FLINK-34634
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: image-2024-03-09-15-25-26-187.png, 
> image-2024-03-09-15-27-46-073.png
>
>
> h3. What's the problem
> Once, I removed a table from the option and then restarted the job from the 
> savepoint, but the job couldn't read the binlog anymore. When I checked the 
> logs, I found an Error level log stating:
> ' The enumerator received invalid request meta group id 6, the valid meta 
> group id range is [0, 4].'
> It appears that the Reader is requesting more splits than the Enumerator is 
> aware of.
> However, the code should indeed remove redundant split information from the 
> Reader as seen in 
> [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does 
> this issue occur?
>  
> h3. why occurs
> !image-2024-03-09-15-25-26-187.png|width=751,height=329!
> Upon examining the code, I discovered the cause. If the job stops before 
> completing all the split meta information and then restarts, this issue 
> occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader 
> is 6, and no meta information has been synchronized, leaving the 
> finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
> restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 
> - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
> table have two split). This could lead to an out-of-range request.
> !image-2024-03-09-15-27-46-073.png|width=755,height=305!
> h3. How to reproduce
>  * Add Thread.sleep(1000L) in 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents
>  to postpone split meta infos synchronization.
> {code:java}
> public void handleSourceEvents(SourceEvent sourceEvent) {
> else if (sourceEvent instanceof BinlogSplitMetaEvent) {
> LOG.debug(
> "Source reader {} receives binlog meta with group id {}.",
> subtaskId,
> ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
> try {
> Thread.sleep(1000L);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
> } {code}
>  * Add Thread.sleep(500L) in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne
>  to trigger savepoint before meta infos synchronization finishes.
>  
> {code:java}
> // step 2: execute insert and trigger savepoint with all tables added
> {
> // ..ingore 
> waitForSinkSize("sink", fetchedDataList.size());
> Thread.sleep(500L);
> assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
> finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
> savepointDirectory);
> jobClient.cancel().get();
> }
> // test removing table one by one, note that there should be at least one 
> table remaining
> for (int round = 0; round < captureAddressTables.length - 1; round++) {
> ...
> }
> {code}
>  
>  * Add chunk-meta.group.size  =2 in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement
> Then, run 
> test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable),
>  the error log will occur.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

2024-04-11 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren resolved FLINK-34634.
---
  Assignee: Hongshun Wang
Resolution: Fixed

> Restarting the job will not read the changelog anymore if it stops before the 
> synchronization of meta information is complete and some table is removed
> ---
>
> Key: FLINK-34634
> URL: https://issues.apache.org/jira/browse/FLINK-34634
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: image-2024-03-09-15-25-26-187.png, 
> image-2024-03-09-15-27-46-073.png
>
>
> h3. What's the problem
> Once, I removed a table from the option and then restarted the job from the 
> savepoint, but the job couldn't read the binlog anymore. When I checked the 
> logs, I found an Error level log stating:
> ' The enumerator received invalid request meta group id 6, the valid meta 
> group id range is [0, 4].'
> It appears that the Reader is requesting more splits than the Enumerator is 
> aware of.
> However, the code should indeed remove redundant split information from the 
> Reader as seen in 
> [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does 
> this issue occur?
>  
> h3. why occurs
> !image-2024-03-09-15-25-26-187.png|width=751,height=329!
> Upon examining the code, I discovered the cause. If the job stops before 
> completing all the split meta information and then restarts, this issue 
> occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader 
> is 6, and no meta information has been synchronized, leaving the 
> finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
> restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 
> - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
> table have two split). This could lead to an out-of-range request.
> !image-2024-03-09-15-27-46-073.png|width=755,height=305!
> h3. How to reproduce
>  * Add Thread.sleep(1000L) in 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents
>  to postpone split meta infos synchronization.
> {code:java}
> public void handleSourceEvents(SourceEvent sourceEvent) {
> else if (sourceEvent instanceof BinlogSplitMetaEvent) {
> LOG.debug(
> "Source reader {} receives binlog meta with group id {}.",
> subtaskId,
> ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
> try {
> Thread.sleep(1000L);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
> } {code}
>  * Add Thread.sleep(500L) in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne
>  to trigger savepoint before meta infos synchronization finishes.
>  
> {code:java}
> // step 2: execute insert and trigger savepoint with all tables added
> {
> // ..ingore 
> waitForSinkSize("sink", fetchedDataList.size());
> Thread.sleep(500L);
> assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
> finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
> savepointDirectory);
> jobClient.cancel().get();
> }
> // test removing table one by one, note that there should be at least one 
> table remaining
> for (int round = 0; round < captureAddressTables.length - 1; round++) {
> ...
> }
> {code}
>  
>  * Add chunk-meta.group.size  =2 in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement
> Then, run 
> test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable),
>  the error log will occur.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]

2024-04-11 Thread via GitHub


PatrickRen merged PR #3134:
URL: https://github.com/apache/flink-cdc/pull/3134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34689) check binlog_row_value_optoins

2024-04-11 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren resolved FLINK-34689.
---
Fix Version/s: cdc-3.1.0
   Resolution: Fixed

> check binlog_row_value_optoins
> --
>
> Key: FLINK-34689
> URL: https://issues.apache.org/jira/browse/FLINK-34689
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Assignee: Lee SeungMin
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: image-2024-03-15-12-56-49-344.png
>
>
> When {{binlog_row_value_optoins}} is set to {{{}PARTIAL_JSON{}}},
> the update operator remains as {{{}Update_rows_partial{}}}.
> Flink CDC does not parse this event because {{Update_row_partial}} binlog 
> event is mapped to {{PARTIAL_UPDATE_ROWS_EVENT}} and Flink CDC do not handle 
> that event type
>  
> Example of Update_row_partial (when {{binlog_row_value_optoins}} = 
> {{PARTIAL_JSON)}}
> !image-2024-03-15-12-56-49-344.png|width=1015,height=30!
> So, we have to check {{binlog_row_value_optoins}} before starting.
>  
>  
> Cretae PR: [[MySQL][Feature] check binlog_row_value_optoins by SML0127 · Pull 
> Request #3148 · apache/flink-cdc 
> (github.com)|https://github.com/apache/flink-cdc/pull/3148]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35072][cdc][doris] Support applying compatible `AlterColumnTypeEvent` to Doris sink [flink-cdc]

2024-04-11 Thread via GitHub


yuxiqian commented on PR #3215:
URL: https://github.com/apache/flink-cdc/pull/3215#issuecomment-2050843987

   I'm not familiar with Doris, could @lvyanquan @JNSimba take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34689) check binlog_row_value_optoins

2024-04-11 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836391#comment-17836391
 ] 

Qingsheng Ren commented on FLINK-34689:
---

flink-cdc master: af7665d33834b4141f875862df59ec1f56dddcbb

> check binlog_row_value_optoins
> --
>
> Key: FLINK-34689
> URL: https://issues.apache.org/jira/browse/FLINK-34689
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Lee SeungMin
>Assignee: Lee SeungMin
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-03-15-12-56-49-344.png
>
>
> When {{binlog_row_value_optoins}} is set to {{{}PARTIAL_JSON{}}},
> the update operator remains as {{{}Update_rows_partial{}}}.
> Flink CDC does not parse this event because {{Update_row_partial}} binlog 
> event is mapped to {{PARTIAL_UPDATE_ROWS_EVENT}} and Flink CDC do not handle 
> that event type
>  
> Example of Update_row_partial (when {{binlog_row_value_optoins}} = 
> {{PARTIAL_JSON)}}
> !image-2024-03-15-12-56-49-344.png|width=1015,height=30!
> So, we have to check {{binlog_row_value_optoins}} before starting.
>  
>  
> Cretae PR: [[MySQL][Feature] check binlog_row_value_optoins by SML0127 · Pull 
> Request #3148 · apache/flink-cdc 
> (github.com)|https://github.com/apache/flink-cdc/pull/3148]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34689][MySQL][Feature] check binlog_row_value_options [flink-cdc]

2024-04-11 Thread via GitHub


PatrickRen merged PR #3148:
URL: https://github.com/apache/flink-cdc/pull/3148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35091) Incorrect warning msg in JM when use metric reporter

2024-04-11 Thread sanqingleo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanqingleo updated FLINK-35091:
---
Summary: Incorrect warning msg in JM when use metric reporter  (was: 
Incorrect Warning msg in JM when use influxdb)

> Incorrect warning msg in JM when use metric reporter
> 
>
> Key: FLINK-35091
> URL: https://issues.apache.org/jira/browse/FLINK-35091
> Project: Flink
>  Issue Type: Improvement
>Reporter: sanqingleo
>Priority: Minor
> Fix For: 1.16.3
>
> Attachments: image-2024-04-12-10-02-20-142.png
>
>
> Hello,
> I encountered an issue while upgrading Flink from version 1.14 to 1.18. After 
> the upgrade, I noticed that some monitoring metrics were not being reported 
> to InfluxDB.
> Upon checking the Job Manager (JM) logs, I found an error indicating that the 
> previously used classes are no longer supported. However, there seems to be 
> an oddly phrased error message that looks like it might have been written 
> incorrectly.
> The error message reads: "The reporter configuration of '{}' configures the 
> reporter class, which is no a no longer supported approach to configure 
> reporters." + " Please configure a factory class instead:"
> I believe the correct phrasing should be: "The reporter configuration of '{}' 
> configures the reporter class, which is a no longer supported approach to 
> configure reporters." + " Please configure a factory class instead:"
> It appears that the words "no a" were accidentally added, making the sentence 
> grammatically incorrect and potentially confusing for users.
>  
> !image-2024-04-12-10-02-20-142.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35091) Incorrect Warning msg in JM when use influxdb

2024-04-11 Thread sanqingleo (Jira)
sanqingleo created FLINK-35091:
--

 Summary: Incorrect Warning msg in JM when use influxdb
 Key: FLINK-35091
 URL: https://issues.apache.org/jira/browse/FLINK-35091
 Project: Flink
  Issue Type: Improvement
Reporter: sanqingleo
 Fix For: 1.16.3
 Attachments: image-2024-04-12-10-02-20-142.png

Hello,

I encountered an issue while upgrading Flink from version 1.14 to 1.18. After 
the upgrade, I noticed that some monitoring metrics were not being reported to 
InfluxDB.

Upon checking the Job Manager (JM) logs, I found an error indicating that the 
previously used classes are no longer supported. However, there seems to be an 
oddly phrased error message that looks like it might have been written 
incorrectly.

The error message reads: "The reporter configuration of '{}' configures the 
reporter class, which is no a no longer supported approach to configure 
reporters." + " Please configure a factory class instead:"

I believe the correct phrasing should be: "The reporter configuration of '{}' 
configures the reporter class, which is a no longer supported approach to 
configure reporters." + " Please configure a factory class instead:"

It appears that the words "no a" were accidentally added, making the sentence 
grammatically incorrect and potentially confusing for users.

 

!image-2024-04-12-10-02-20-142.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35090) Doris sink fails to create table when database does not exist

2024-04-11 Thread Xiqian YU (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836386#comment-17836386
 ] 

Xiqian YU commented on FLINK-35090:
---

[~renqs] I'm willing to take this ticket.

> Doris sink fails to create table when database does not exist
> -
>
> Key: FLINK-35090
> URL: https://issues.apache.org/jira/browse/FLINK-35090
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Xiqian YU
>Priority: Minor
>
> Currently, Doris sink connector doesn't support creating database 
> automatically. When user specifies a sink namespace with non-existing 
> database in YAML config, Doris connector will crash.
> Expected behaviour: Doris sink connector should create both database and 
> table automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]

2024-04-11 Thread via GitHub


RocMarshal commented on PR #20990:
URL: https://github.com/apache/flink/pull/20990#issuecomment-2050824391

   Thank you @XComp @1996fanrui @Samrat002 very much for the review and sorry 
for the late response.
   I make some change based on your comments.
   Would you mind helping to have a checking on it if you had the time ?
   Thank you :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35090) Doris sink fails to create table when database does not exist

2024-04-11 Thread Xiqian YU (Jira)
Xiqian YU created FLINK-35090:
-

 Summary: Doris sink fails to create table when database does not 
exist
 Key: FLINK-35090
 URL: https://issues.apache.org/jira/browse/FLINK-35090
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Xiqian YU


Currently, Doris sink connector doesn't support creating database 
automatically. When user specifies a sink namespace with non-existing database 
in YAML config, Doris connector will crash.

Expected behaviour: Doris sink connector should create both database and table 
automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]

2024-04-11 Thread via GitHub


RocMarshal commented on code in PR #20990:
URL: https://github.com/apache/flink/pull/20990#discussion_r1561915638


##
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+private JobExecutionResult latestExecutionResult;
+
+/**
+ * The number of times a test should be repeated.
+ *
+ * This is useful for runtime changes, which affect resource 
management. Running certain
+ * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+ */
+private int numberOfTestRepetitions = 1;
+
+private boolean isCollectionExecution;
+
+public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+this.numberOfTestRepetitions = numberOfTestRepetitions;
+}
+
+public int getParallelism() {
+return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+}
+
+public JobExecutionResult getLatestExecutionResult() {
+return this.latestExecutionResult;
+}
+
+public boolean isCollectionExecution() {
+return isCollectionExecution;
+}
+
+// 

+//  Methods to create the test program and for pre- and post- test work
+// 

+
+protected abstract void testProgram() throws Exception;
+
+protected void preSubmit() throws Exception {}
+
+protected void postSubmit() throws Exception {}
+
+protected boolean skipCollectionExecution() {
+return false;
+}

Review Comment:
   thanks a lot for the comments.
   It sounds good to me.
   
   If we complete firstly the migration based on the principle of minimal 
changes. What do you think about considering the extension mode after 
completing all Junit5 migrations? 
   Because 
   - there are many subclasses of these base classes, it is difficult for us to 
design extensions that are suitable and applicable to all subclasses before 
complete migration.
   - At the same time, it can make PR and commitments clearer, which is 
beneficial for reviewers to advance the review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes

2024-04-11 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35089:
--

 Summary: Two input AbstractStreamOperator may throw NPE when 
receiving RecordAttributes
 Key: FLINK-35089
 URL: https://issues.apache.org/jira/browse/FLINK-35089
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.19.0
Reporter: Xuannan Su


Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serialization and these 
fields non-transient.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]

2024-04-11 Thread via GitHub


RocMarshal commented on code in PR #20990:
URL: https://github.com/apache/flink/pull/20990#discussion_r1561911292


##
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+private JobExecutionResult latestExecutionResult;
+
+/**
+ * The number of times a test should be repeated.
+ *
+ * This is useful for runtime changes, which affect resource 
management. Running certain
+ * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+ */
+private int numberOfTestRepetitions = 1;
+
+private boolean isCollectionExecution;
+
+public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+this.numberOfTestRepetitions = numberOfTestRepetitions;
+}
+
+public int getParallelism() {
+return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+}
+
+public JobExecutionResult getLatestExecutionResult() {
+return this.latestExecutionResult;
+}
+
+public boolean isCollectionExecution() {
+return isCollectionExecution;
+}
+
+// 

+//  Methods to create the test program and for pre- and post- test work
+// 

+
+protected abstract void testProgram() throws Exception;
+
+protected void preSubmit() throws Exception {}
+
+protected void postSubmit() throws Exception {}
+
+protected boolean skipCollectionExecution() {
+return false;
+}
+
+// 

+//  Test entry point
+// 

+
+@Test
+public void testJobWithObjectReuse() {
+isCollectionExecution = false;
+
+// pre-submit
+try {
+preSubmit();
+} catch (Exception e) {
+System.err.println(e.getMessage());
+e.printStackTrace();
+fail("Pre-submit work caused an error: " + e.getMessage());

Review Comment:
   deleted.



##
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExe

Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]

2024-04-11 Thread via GitHub


RocMarshal commented on code in PR #20990:
URL: https://github.com/apache/flink/pull/20990#discussion_r1561910994


##
flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java:
##
@@ -35,40 +38,54 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.File;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 
 import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Integration tests for Hadoop IO formats. */
-@RunWith(Parameterized.class)
-public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBaseJUnit5 {
 
 private static final int NUM_PROGRAMS = 2;
 
-private final int curProgId;
+@Parameter private int curProgId;
 private String[] resultPath;
 private String[] expectedResult;
 private String sequenceFileInPath;
 private String sequenceFileInPathNull;
 
-public HadoopIOFormatsITCase(int curProgId) {
-this.curProgId = curProgId;
+@BeforeEach
+void checkOperatingSystem() {
+// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+assumeThat(OperatingSystem.isWindows())
+.as("This test can't run successfully on Windows.")
+.isFalse();
 }
 
-@Before
-public void checkOperatingSystem() {
-// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
-Assume.assumeTrue(
-"This test can't run successfully on Windows.", 
!OperatingSystem.isWindows());
+@Override
+@TestTemplate
+public void testJobWithObjectReuse() {
+super.testJobWithoutObjectReuse();
+}
+
+@Override
+@TestTemplate
+public void testJobWithoutObjectReuse() {
+super.testJobWithoutObjectReuse();
+}
+
+@Override
+@TestTemplate
+public void testJobCollectionExecution() {
+super.testJobCollectionExecution();
 }

Review Comment:
   because it needs the `@TestTemplate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]

2024-04-11 Thread via GitHub


RocMarshal commented on code in PR #20990:
URL: https://github.com/apache/flink/pull/20990#discussion_r1561910064


##
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+private JobExecutionResult latestExecutionResult;
+
+/**
+ * The number of times a test should be repeated.
+ *
+ * This is useful for runtime changes, which affect resource 
management. Running certain
+ * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+ */
+private int numberOfTestRepetitions = 1;
+
+private boolean isCollectionExecution;
+
+public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+this.numberOfTestRepetitions = numberOfTestRepetitions;
+}
+
+public int getParallelism() {
+return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+}
+
+public JobExecutionResult getLatestExecutionResult() {
+return this.latestExecutionResult;
+}
+
+public boolean isCollectionExecution() {
+return isCollectionExecution;
+}
+
+// 

+//  Methods to create the test program and for pre- and post- test work
+// 

+
+protected abstract void testProgram() throws Exception;
+
+protected void preSubmit() throws Exception {}
+
+protected void postSubmit() throws Exception {}
+
+protected boolean skipCollectionExecution() {
+return false;
+}
+
+// 

+//  Test entry point
+// 

+
+@Test
+public void testJobWithObjectReuse() {
+isCollectionExecution = false;
+
+// pre-submit
+try {
+preSubmit();
+} catch (Exception e) {
+System.err.println(e.getMessage());
+e.printStackTrace();
+fail("Pre-submit work caused an error: " + e.getMessage());
+}
+
+// This only works because the underlying ExecutionEnvironment is a 
TestEnvironment
+// We should fix that we are able to get access to the latest 
execution result from a
+// different
+// execution environment and how the object reuse mode is enabled
+TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment();
+env.getConfig().enableObjectReuse();
+
+// Possibly run the test multiple times
+executeProgramMultipleTimes(env);
+}
+
+private void executeProgramMultipleTimes(ExecutionEnvironment env) {
+for (int i = 0; i < numberOfTestRepetitions; i++) {
+// call the test program

Review Comment:
   updated.



##
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyrig

[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-04-11 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836382#comment-17836382
 ] 

Mason Chen commented on FLINK-34127:


Sure. Can I assign myself now with committer permissions? :D

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Priority: Major
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:11.2933375Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T01:06:12.1790031Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:12.5703927Z Test 
> org.apache.flink.tests.util.ka

Re: [PR] [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility [flink]

2024-04-11 Thread via GitHub


RocMarshal commented on code in PR #20990:
URL: https://github.com/apache/flink/pull/20990#discussion_r1296008112


##
flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java:
##
@@ -137,27 +132,26 @@ public void testFetchNextRecordReaderHasNewValue() throws 
Exception {
 setupHadoopInputFormat(new DummyInputFormat(), 
Job.getInstance(), recordReader);
 hadoopInputFormat.fetchNext();
 
-assertThat(hadoopInputFormat.fetched, is(true));
-assertThat(hadoopInputFormat.hasNext, is(true));
+assertThat(hadoopInputFormat.fetched).isTrue();
+assertThat(hadoopInputFormat.hasNext).isTrue();
 }
 
 @Test
-public void testFetchNextRecordReaderThrowsException() throws Exception {
+void testFetchNextRecordReaderThrowsException() throws Exception {
 
 DummyRecordReader recordReader = mock(DummyRecordReader.class);
 when(recordReader.nextKeyValue()).thenThrow(new 
InterruptedException());
 
 HadoopInputFormat hadoopInputFormat =
 setupHadoopInputFormat(new DummyInputFormat(), 
Job.getInstance(), recordReader);
 
-exception.expect(IOException.class);
-hadoopInputFormat.fetchNext();
+
assertThatThrownBy(hadoopInputFormat::fetchNext).isInstanceOf(IOException.class);
 
-assertThat(hadoopInputFormat.hasNext, is(true));
+assertThat(hadoopInputFormat.hasNext).isFalse();

Review Comment:
   Hi, @1996fanrui nice catch. 
   sorry for no  explaining this change before the  review.
   
   When debugging the line, it was caused by  `Expect exception mechanism` 
difference between junit4 and junit5. to be short, in junit4, the target line 
`assertThat(hadoopInputFormat.hasNext, is(true));` was not executed.
   So the change occurred to here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2024-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-30238.
--
Resolution: Invalid

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836364#comment-17836364
 ] 

Martijn Visser commented on FLINK-35088:


[~elon] Please verify this with later version of Flink, since there have been 
many bugfixes since. 

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> 
>
> Key: FLINK-35088
> URL: https://issues.apache.org/jira/browse/FLINK-35088
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
>  ~[

[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-11 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836325#comment-17836325
 ] 

Kenneth William Krugler commented on FLINK-35076:
-

Hi [~elon] - please post these questions about the impact of idleness and how 
to rebalance on Stack Overflow, or the Flink user list. That way the Q&A can 
benefit the entire community, thanks!

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-11 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238
 ] 

elon_X edited comment on FLINK-35076 at 4/11/24 3:23 PM:
-

[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source source, WatermarkStrategy 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance. I'm not quite sure 
how to shuffle the data source before {{{}fromSource{}}}.


was (Author: JIRAUSER303028):
[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source source, WatermarkStrategy 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance.

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-11 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238
 ] 

elon_X commented on FLINK-35076:


[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source source, WatermarkStrategy 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance.

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-11 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2049852124

   @XComp Glad for your vacation!
   Finally I also addressed the deprecation warnings and went through the 
implementation of a custom connector through `DynamicTableSource`.
   
   It turned out to be quite tough, as probably it is not that common, or these 
new APIs are not super-well documented for now.
   
   I wanted to use `TableEnvironment.fromValues` however, I could not use it as 
the test was hanging...
   I want to understand why and, in case, file an issue for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34224) ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest timed out

2024-04-11 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836218#comment-17836218
 ] 

Ryan Skraba commented on FLINK-34224:
-

1.20 Java 8: Test (module: core) 
https://github.com/apache/flink/actions/runs/8643097154/job/23696501028#step:10:11318

> ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest 
> timed out
> ---
>
> Key: FLINK-34224
> URL: https://issues.apache.org/jira/browse/FLINK-34224
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The timeout appeared in the GitHub Actions workflow (currently in test phase; 
> [FLIP-396|https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure]):
> https://github.com/XComp/flink/actions/runs/7632434859/job/20793613726#step:10:11040
> {code}
> Jan 24 01:38:36 "ForkJoinPool-1-worker-1" #16 daemon prio=5 os_prio=0 
> tid=0x7f3b200ae800 nid=0x406e3 waiting on condition [0x7f3b1ba0e000]
> Jan 24 01:38:36java.lang.Thread.State: WAITING (parking)
> Jan 24 01:38:36   at sun.misc.Unsafe.park(Native Method)
> Jan 24 01:38:36   - parking to wait for  <0xdfbbb358> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> Jan 24 01:38:36   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Jan 24 01:38:36   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> Jan 24 01:38:36   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
> Jan 24 01:38:36   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> Jan 24 01:38:36   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Jan 24 01:38:36   at 
> org.apache.flink.changelog.fs.ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest.java:251)
> Jan 24 01:38:36   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34996][Connectors/Kafka] Use UserCodeCL to instantiate Deserializer [flink-connector-kafka]

2024-04-11 Thread via GitHub


hugogu commented on PR #89:
URL: 
https://github.com/apache/flink-connector-kafka/pull/89#issuecomment-2049754379

   @morazow Thanks for your approval. I have just rebased this PR to include 
the build fix made in main branch. Hopefully the build would success this time.
   
   I also noticed that v3.1 branch, may I know if this fix can be included? 
Shall I raise another PR for that? Not quite sure about the release procedure 
yet.  
   
   cc @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34738][cdc][docs-zh] "Deployment - YARN" Page for Flink CDC Chinese Documentation [flink-cdc]

2024-04-11 Thread via GitHub


Vincent-Woo commented on PR #3205:
URL: https://github.com/apache/flink-cdc/pull/3205#issuecomment-2049708639

   @leonardBang @PatrickRen @lvyanquan @loserwang1024 Excuse me, do you have 
time to take a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]

2024-04-11 Thread via GitHub


flinkbot commented on PR #24653:
URL: https://github.com/apache/flink/pull/24653#issuecomment-2049623349

   
   ## CI report:
   
   * ecce541d1b05cd350ca635fc8e0f6611743d583d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32084) Migrate current file merging of channel state into the file merging framework

2024-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32084:
---
Labels: pull-request-available  (was: )

> Migrate current file merging of channel state into the file merging framework
> -
>
> Key: FLINK-32084
> URL: https://issues.apache.org/jira/browse/FLINK-32084
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework [flink]

2024-04-11 Thread via GitHub


fredia opened a new pull request, #24653:
URL: https://github.com/apache/flink/pull/24653

   
   
   ## What is the purpose of the change
   
   Migrate current file merging of channel state snapshot into the unify file 
merging framework.
   This PR only focuses on the snapshot/writing part.
   
   
   ## Brief change log
   - Introduce `UnifyMergingChannelStateWriteRequestDispatcher` which construct 
a build a ` ChannelStateCheckpointWriter` that uses unified file merging 
mechanism to write channel state.
   - Update `ChannelStateWriteRequestExecutorFactory#getOrCreateExecutor` , 
different ways to merge small files can be chosen here.
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-04-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836149#comment-17836149
 ] 

Martijn Visser commented on FLINK-34127:


[~mason6345] Is this something that you can check? I think I also see some 
flakyness here, like with 
https://github.com/apache/flink-connector-kafka/actions/runs/8646160813/job/23704885473

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Priority: Major
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:11.2933375Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T01:06:12.1790031Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [Ka

[jira] [Created] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-11 Thread elon_X (Jira)
elon_X created FLINK-35088:
--

 Summary: watermark alignment maxAllowedWatermarkDrift and 
updateInterval param need check
 Key: FLINK-35088
 URL: https://issues.apache.org/jira/browse/FLINK-35088
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, Runtime / Coordination
Affects Versions: 1.16.1
Reporter: elon_X
 Attachments: image-2024-04-11-20-12-29-951.png

When I use watermark alignment,

1.I found that setting maxAllowedWatermarkDrift to a negative number initially 
led me to believe it could support delaying the consumption of the source, so I 
tried it. Then, the upstream data flow would hang indefinitely.

Root cause:
{code:java}
long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()               
  + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark < 
lastEmittedWatermark, then the SourceReader will be blocked indefinitely and 
cannot recover.

I'm not sure if this is a supported feature of watermark alignment. If it's 
not, I think an additional parameter validation should be implemented to throw 
an exception on the client side if the value is negative.

2.The updateInterval parameter also lacks validation. If I set it to 0, the 
task will throw an exception when starting the job manager. The JDK class 
java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
throws the exception.
{code:java}
java.lang.IllegalArgumentException: null
at 
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
 ~[?:1.8.0_351]
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:208) 
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.

Re: [PR] [FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController [flink]

2024-04-11 Thread via GitHub


Zakelly commented on code in PR #24633:
URL: https://github.com/apache/flink/pull/24633#discussion_r1560916376


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A buffer to hold state requests to execute state requests in batch, which 
can only be manipulated
+ * within task thread.
+ *
+ * @param  the type of the record
+ * @param  the type of the key
+ */
+@NotThreadSafe
+public class StateRequestBuffer {
+/**
+ * The state requests in this buffer could be executed when the buffer is 
full or configured
+ * batch size is reached. All operations on this buffer must be invoked in 
task thread.
+ */
+final LinkedList> activeQueue;
+
+/**
+ * The requests in that should wait until all preceding records with 
identical key finishing its
+ * execution. After which the queueing requests will move into the active 
buffer. All operations
+ * on this buffer must be invoked in task thread.
+ */
+final Map>> blockingQueue;
+
+/** The number of state requests in blocking queue. */
+int blockingQueueSize;
+
+public StateRequestBuffer() {
+this.activeQueue = new LinkedList<>();
+this.blockingQueue = new HashMap<>();
+this.blockingQueueSize = 0;
+}
+
+void enqueueToActive(StateRequest request) {
+activeQueue.add(request);
+}
+
+void enqueueToBlocking(StateRequest request) {
+blockingQueue
+.computeIfAbsent(request.getRecordContext().getKey(), k -> new 
LinkedList<>())
+.add(request);
+blockingQueueSize++;
+}
+
+/**
+ * Try to pull one state request with specific key from blocking queue to 
active queue.
+ *
+ * @param key The key to release, the other records with this key is no 
longer blocking.
+ * @return The first record context with the same key in blocking queue, 
null if no such record.
+ */
+@Nullable
+RecordContext tryActivateOneByKey(K key) {
+if (!blockingQueue.containsKey(key)) {
+return null;
+}
+
+StateRequest stateRequest = blockingQueue.get(key).getFirst();
+activeQueue.add(stateRequest);
+blockingQueue.get(key).removeFirst();
+if (blockingQueue.get(key).isEmpty()) {
+blockingQueue.remove(key);
+}
+blockingQueueSize--;
+return (RecordContext) stateRequest.getRecordContext();
+}
+
+/**
+ * Get the number of state requests of blocking queue in constant-time.
+ *
+ * @return the number of state requests of blocking queue.
+ */
+@VisibleForTesting
+int blockingQueueSize() {
+return blockingQueueSize;
+}
+
+/**
+ * Get the number of state requests of active queue in constant-time.
+ *
+ * @return the number of state requests of active queue.
+ */
+@VisibleForTesting
+int activeQueueSize() {
+return activeQueue.size();
+}
+
+/**
+ * Try to pop N state requests from active queue, if the size of active 
queue is less than N,
+ * return all the requests in active queue.
+ *
+ * @param N the number of state requests to pop.
+ * @return A list of state requests.
+ */
+List> popActive(int N) {

Review Comment:
   ```suggestion
   List> popActive(int n) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34661][runtime] TaskExecutor supports retain partitions after JM crashed. [flink]

2024-04-11 Thread via GitHub


zhuzhurk commented on code in PR #24582:
URL: https://github.com/apache/flink/pull/24582#discussion_r1560848973


##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##
@@ -1886,6 +1911,33 @@ private void disconnectJobManagerConnection(
 }
 }
 
+if (cleanupPartitionLater) {
+// this branch is for job recovery
+final Duration maxRegistrationDuration =
+taskManagerConfiguration.getMaxRegistrationDuration();
+
+if (maxRegistrationDuration != null) {
+log.info(
+"Waiting for {} mills for job {} to recover. If there 
is no reconnection, "

Review Comment:
there is no reconnection -> the job manager is not reconnected



##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##
@@ -409,6 +410,12 @@ public TaskExecutor(
 resourceId, new JobManagerHeartbeatListener(), 
getMainThreadExecutor(), log);
 }
 
+private boolean isJobRecoveryEnabled() {

Review Comment:
   Better to rename it to `shoudRetainPartitionsOnJobManagerConnectionLost`.
   And it should return `true` only if netty shuffle is used.



##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##
@@ -2474,16 +2523,21 @@ public void jobManagerLostLeadership(final JobID jobId, 
final JobMasterId jobMas
 "JobManager for job {} with leader id {} lost 
leadership.", jobId, jobMasterId);
 
 runAsync(
-() ->
-jobTable.getConnection(jobId)
-.ifPresent(
-jobManagerConnection ->
-
disconnectJobManagerConnection(
-
jobManagerConnection,
-new Exception(
-"Job 
leader for job id "
-+ 
jobId
-+ 
" lost leadership.";
+() -> {
+Optional connection = 
jobTable.getConnection(jobId);
+
+if (connection.isPresent()) {
+Exception cause =
+new Exception(
+"Job leader for job id " + jobId + 
" lost leadership.");
+if (isJobRecoveryEnabled()) {
+
disconnectJobManagerConnectionAndCleanupPartitionLater(

Review Comment:
   Would you confirm if it's possible to reconnect to the old JM? if it can 
happen, the retained partitions will be leaked.



##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##
@@ -1373,6 +1380,12 @@ private void disconnectAndTryReconnectToJobManager(
 }
 }
 
+private void disconnectAndTryReconnectToJobManagerAndCleanupPartitionLater(
+JobTable.Connection jobManagerConnection, Exception cause) {
+
disconnectJobManagerConnectionAndCleanupPartitionLater(jobManagerConnection, 
cause);

Review Comment:
   I prefer to name it as `disconnectAndTryReconnectToJobManager` and add 
comments to explain that it does not cleanup partitions right now. Only if the 
reconnection cannot be done will the partitions get cleared.
   
   It is natural in the case to support JM reconnection so we do not need add 
all the details to the method name. And `CleanupPartitionLater` may not happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836142#comment-17836142
 ] 

Zakelly Lan commented on FLINK-34704:
-

Thanks for your detailed explanation! Now I see your point. The problematic of 
my approach is that current record of AWOP will be lost.

+1 for temporarily increase the size of the buffer by 1. IIUC even if the AWOP 
has an upstream, if current record could normally finish its process due to 
increment of buffer, the cp could proceed.

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35038) Bump test dependency org.yaml:snakeyaml to 2.2

2024-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-35038.
--
Fix Version/s: kafka-4.0.0
   kafka-3.1.1
   (was: 3.1.0)
   Resolution: Fixed

Fixed in apache/flink-connector-kafka:

main: 369e7be46a70fd50d68746498aed82105741e7d6
v3.1: ad798fc5387ba3582f92516697d60d0f523e86cb

> Bump test dependency org.yaml:snakeyaml to 2.2 
> ---
>
> Key: FLINK-35038
> URL: https://issues.apache.org/jira/browse/FLINK-35038
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 3.1.0
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kafka-4.0.0, kafka-3.1.1
>
>
> Usage of SnakeYAML via {{flink-shaded}} was replaced by an explicit test 
> scope dependency on {{org.yaml:snakeyaml:1.31}} with FLINK-34193.
> This outdated version of SnakeYAML triggers security warnings. These should 
> not be an actual issue given the test scope, but we should consider bumping 
> the version for security hygiene purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35038) Bump test dependency org.yaml:snakeyaml to 2.2 for Flink Kafka connector

2024-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-35038:
---
Summary: Bump test dependency org.yaml:snakeyaml to 2.2 for Flink Kafka 
connector  (was: Bump test dependency org.yaml:snakeyaml to 2.2 )

> Bump test dependency org.yaml:snakeyaml to 2.2 for Flink Kafka connector
> 
>
> Key: FLINK-35038
> URL: https://issues.apache.org/jira/browse/FLINK-35038
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 3.1.0
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kafka-4.0.0, kafka-3.1.1
>
>
> Usage of SnakeYAML via {{flink-shaded}} was replaced by an explicit test 
> scope dependency on {{org.yaml:snakeyaml:1.31}} with FLINK-34193.
> This outdated version of SnakeYAML triggers security warnings. These should 
> not be an actual issue given the test scope, but we should consider bumping 
> the version for security hygiene purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink Kafka connector

2024-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-35008.
--
Fix Version/s: kafka-4.0.0
   kafka-3.1.1
   Resolution: Fixed

Fixed in apache/flink-connector-kafka

v3.1: 4168d0f22f2fb6b696b5e09d7b8d1f99a6714b78
main: 1c39e3b7495640c9b3784ec672097741c072cebb

> Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink 
> Kafka connector
> 
>
> Key: FLINK-35008
> URL: https://issues.apache.org/jira/browse/FLINK-35008
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-4.0.0, kafka-3.1.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35038] Bump `org.yaml:snakeyaml` to `2.2` [flink-connector-kafka]

2024-04-11 Thread via GitHub


boring-cyborg[bot] commented on PR #93:
URL: 
https://github.com/apache/flink-connector-kafka/pull/93#issuecomment-2049520227

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35038] Bump `org.yaml:snakeyaml` to `2.2` [flink-connector-kafka]

2024-04-11 Thread via GitHub


MartijnVisser merged PR #93:
URL: https://github.com/apache/flink-connector-kafka/pull/93


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink Kafka connector

2024-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35008:
---
Labels: pull-request-available  (was: )

> Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink 
> Kafka connector
> 
>
> Key: FLINK-35008
> URL: https://issues.apache.org/jira/browse/FLINK-35008
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35008] Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 [flink-connector-kafka]

2024-04-11 Thread via GitHub


MartijnVisser merged PR #87:
URL: https://github.com/apache/flink-connector-kafka/pull/87


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35007) Update Flink Kafka connector to support 1.19

2024-04-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-35007.
--
Fix Version/s: kafka-4.0.0
   kafka-3.1.1
   Resolution: Fixed

Fixed in apache/flink-connector-kafka

v3.1: 809cb0786565b3515d1a17319b0f98f59b1ef6c2
main: 897001d5682a0708042d59be81a10485ffd0dde7

> Update Flink Kafka connector to support 1.19
> 
>
> Key: FLINK-35007
> URL: https://issues.apache.org/jira/browse/FLINK-35007
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-4.0.0, kafka-3.1.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35007][BP v3.1] Add support for Flink 1.19 (#90) [flink-connector-kafka]

2024-04-11 Thread via GitHub


MartijnVisser merged PR #94:
URL: https://github.com/apache/flink-connector-kafka/pull/94


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-20217) More fine-grained timer processing

2024-04-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-20217:
--

Assignee: Piotr Nowojski

> More fine-grained timer processing
> --
>
> Key: FLINK-20217
> URL: https://issues.apache.org/jira/browse/FLINK-20217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Timers are currently processed in one big block under the checkpoint lock 
> (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
> in a number of scenarios while doing checkpointing which would lead to 
> checkpoints timing out (and even unaligned checkpoints would not help).
> If you have a huge number of timers to process when advancing the watermark 
> and the task is also back-pressured, the situation may actually be worse 
> since you would block on the checkpoint lock and also wait for 
> buffers/credits from the receiver.
> I propose to make this loop more fine-grained so that it is interruptible by 
> checkpoints, but maybe there is also some other way to improve here.
> This issue has been for example observed here: 
> https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34961) GitHub Actions runner statistcs can be monitored per workflow name

2024-04-11 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-34961:

Fix Version/s: kafka-4.0.0

> GitHub Actions runner statistcs can be monitored per workflow name
> --
>
> Key: FLINK-34961
> URL: https://issues.apache.org/jira/browse/FLINK-34961
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: kafka-4.0.0
>
>
> Apache Infra allows the monitoring of runner usage per workflow (see [report 
> for 
> Flink|https://infra-reports.apache.org/#ghactions&project=flink&hours=168&limit=10];
>   only accessible with Apache committer rights). They accumulate the data by 
> workflow name. The Flink space has multiple repositories that use the generic 
> workflow name {{CI}}). That makes the differentiation in the report harder.
> This Jira issue is about identifying all Flink-related projects with a CI 
> workflow (Kubernetes operator and the JDBC connector were identified, for 
> instance) and adding a more distinct name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34961) GitHub Actions runner statistcs can be monitored per workflow name

2024-04-11 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836128#comment-17836128
 ] 

Sergey Nuyanzin commented on FLINK-34961:
-

Merged to flink-connector-kafka main as 
[c47abb3933b7c1e567a9142c6495038d16d42dd0|https://github.com/apache/flink-connector-kafka/commit/c47abb3933b7c1e567a9142c6495038d16d42dd0]

> GitHub Actions runner statistcs can be monitored per workflow name
> --
>
> Key: FLINK-34961
> URL: https://issues.apache.org/jira/browse/FLINK-34961
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, starter
>
> Apache Infra allows the monitoring of runner usage per workflow (see [report 
> for 
> Flink|https://infra-reports.apache.org/#ghactions&project=flink&hours=168&limit=10];
>   only accessible with Apache committer rights). They accumulate the data by 
> workflow name. The Flink space has multiple repositories that use the generic 
> workflow name {{CI}}). That makes the differentiation in the report harder.
> This Jira issue is about identifying all Flink-related projects with a CI 
> workflow (Kubernetes operator and the JDBC connector were identified, for 
> instance) and adding a more distinct name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34961] Use dedicated CI name for Kafka connector to differentiate it in infra-reports [flink-connector-kafka]

2024-04-11 Thread via GitHub


snuyanzin merged PR #92:
URL: https://github.com/apache/flink-connector-kafka/pull/92


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink Kafka connector

2024-04-11 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836125#comment-17836125
 ] 

Martijn Visser commented on FLINK-35008:


I've updated to PR to use 1.26.1

> Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink 
> Kafka connector
> 
>
> Key: FLINK-35008
> URL: https://issues.apache.org/jira/browse/FLINK-35008
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

2024-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35022:
---
Labels: pull-request-available  (was: )

> Add TypeInformed Element Converter for DynamoDbSink
> ---
>
> Key: FLINK-35022
> URL: https://issues.apache.org/jira/browse/FLINK-35022
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-4.3.0
>Reporter: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> h2. Context
> {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
> {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
> Flink stream objects to DynamoDb write requests, where item is represented as 
> {{Map}}.
> {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
> format similar with type identification properties as in
> {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.
> Since TypeInformation is already natively supported in Flink, many 
> implementations of the DynamoDb ElementConverted is just a boiler plate. 
> For example 
> {code:title="Simple POJO Element Conversion"}
>  public class Order {
> String id;
> int quantity;
> double total;
> }
> {code}
> The implementation of the converter must be 
> {code:title="Simple POJO DDB Element Converter"}
> public static class SimplePojoElementConverter implements 
> ElementConverter {
> @Override
> public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
> context) {
> Map itemMap = new HashMap<>();
> itemMap.put("id", AttributeValue.builder().s(order.id).build());
> itemMap.put("quantity", 
> AttributeValue.builder().n(String.valueOf(order.quantity)).build());
> itemMap.put("total", 
> AttributeValue.builder().n(String.valueOf(order.total)).build());
> return DynamoDbWriteRequest.builder()
> .setType(DynamoDbWriteRequestType.PUT)
> .setItem(itemMap)
> .build();
> }
> @Override
> public void open(Sink.InitContext context) {
> 
> }
> }
> {code}
> while this might not be too much of work, however it is a fairly common case 
> in Flink and this implementation requires some fair knowledge of DDB model 
> for new users.
> h2. Proposal 
> Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:
> {code:title="TypeInformedElementconverter"} 
> public class DynamoDbTypeInformedElementConverter implements 
> ElementConverter {
> DynamoDbTypeInformedElementConverter(CompositeType typeInfo);
> public DynamoDbWriteRequest convertElement(input) {
> switch this.typeInfo{
> case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
> AttributeValue.fromS(o.toString())
> case: BasicTypeInfo.SHORT_TYPE_INFO: 
> case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> 
> AttributeValue.fromN(o.toString())
>case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
>   .
> }
> }
> }
> // User Code
> public static void main(String []args) {
>   DynamoDbTypeInformedElementConverter elementConverter = new 
> DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
> DdbSink.setElementConverter(elementConverter); 
> }
> {code}
> We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which 
> should be enough to cover all DDB supported types 
> (s,n,bool,b,ss,ns,bs,bools,m,l)
> 1- 
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35022] Add TypeInformed DDB Element Converter [flink-connector-aws]

2024-04-11 Thread via GitHub


vahmed-hamdy opened a new pull request, #136:
URL: https://github.com/apache/flink-connector-aws/pull/136

   
   
   ## Purpose of the change
   
   Add `DynamoDbTypeInformedElementConverter` to convert Elements to dynamoDb 
Sink using its provided type Info.
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   - Added unit tests
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-XXXXX] Bump org.yaml:snakeyaml from 1.31 to 2.0 in /flink-connector-kafka [flink-connector-kafka]

2024-04-11 Thread via GitHub


MartijnVisser commented on PR #85:
URL: 
https://github.com/apache/flink-connector-kafka/pull/85#issuecomment-2049475263

   Superseded by https://github.com/apache/flink-connector-kafka/pull/93


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-XXXXX] Bump org.yaml:snakeyaml from 1.31 to 2.0 in /flink-connector-kafka [flink-connector-kafka]

2024-04-11 Thread via GitHub


MartijnVisser closed pull request #85: [FLINK-X] Bump org.yaml:snakeyaml 
from 1.31 to 2.0 in /flink-connector-kafka
URL: https://github.com/apache/flink-connector-kafka/pull/85


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-29050) [JUnit5 Migration] Module: flink-hadoop-compatibility

2024-04-11 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798856#comment-17798856
 ] 

RocMarshal edited comment on FLINK-29050 at 4/11/24 10:56 AM:
--

Based on [https://github.com/apache/flink/pull/20990#discussion_r1292783464]

We'd like to do the following sub-tasks for the current jira.
 - Rename  AbstractTestBase, JavaProgramTestBase MultipleProgramsTestBase to 
JUnit4,     
 - Use jUnit5 to re-write the implementations for the above classes & tag 
JUnit4 classes as deprecated 

 - Use junit5 implementation classes to migrate the Module: 
flink-hadoop-compatibility

 - Use junit5 implementation to make adaption for the sub-classes of JUnit4 
(Maybe this part of the work needs to be recorded and promoted in other jiras)


was (Author: rocmarshal):
Based on [https://github.com/apache/flink/pull/20990#discussion_r1292783464]

We'd like to do the following sub-tasks for the current jira.
 - Rename  AbstractTestBase, JavaProgramTestBase MultipleProgramsTestBase to 
JUnit4,     
 - Use jUnit5 to re-write the implementations for the above classes & tag 
JUnit4 classes as deprecated 

 - Use junit5 implementation classes to migrate the Module: 
flink-hadoop-compatibility

 - Use junit5 implementation to make adaption for the sub-classes of JUnit4

> [JUnit5 Migration] Module: flink-hadoop-compatibility
> -
>
> Key: FLINK-29050
> URL: https://issues.apache.org/jira/browse/FLINK-29050
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hadoop Compatibility, Tests
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available, stale-assigned, starter
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836113#comment-17836113
 ] 

Piotr Nowojski commented on FLINK-34704:


Sounds good to me (y) Indeed we could later provide some kind of overdraft 
buffer capacity to be used just for checkpointing. I think that this might 
relate to the things I want to propose in FLIP-443 as it will give the AWOP 
some way of knowing that it should use the overdraft buffer. Let's discuss this 
later and keep the ticket open :)

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]

2024-04-11 Thread via GitHub


yuxiqian commented on code in PR #3221:
URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560819276


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java:
##
@@ -108,7 +110,32 @@ public void execute(Context context) throws Exception {
 this.taskRunning = true;
 try {
 while (taskRunning) {
-Optional next = 
Optional.ofNullable(changeStreamCursor.tryNext());
+Optional next;
+try {
+next = Optional.ofNullable(changeStreamCursor.tryNext());
+} catch (MongoQueryException e) {
+if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) {
+ChangeStreamOffset offset =
+new 
ChangeStreamOffset(streamSplit.getStartingOffset().getOffset());

Review Comment:
   @Jiabao-Sun You're right, I've added resume token expiration checks in 
creating and iterating process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35045][state] Introduce Internal State for Async State API [flink]

2024-04-11 Thread via GitHub


Zakelly commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1560803119


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.internal;

Review Comment:
   I'm wondering if package name `org.apache.flink.runtime.state.v2` is enough.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalKvState.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKvState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKvState implements State {

Review Comment:
   Maybe a base class for all state named `InternalKeyedState` is better? Not 
for KVState, and we'd better break the hierarchy among internal map state and 
value state.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/ValueStateImpl.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.internal;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * A default implementation of {@link ValueState} which delegates all async 
requests to {@link
+ * AsyncExecutionController}.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+public class ValueStateImpl extends InternalKvState implements 
ValueState {
+
+public ValueStateImpl(
+Async

[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836106#comment-17836106
 ] 

Gyula Fora edited comment on FLINK-34704 at 4/11/24 10:41 AM:
--

I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint. But [~Zakelly] also has a valid point.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer


was (Author: gyfora):
I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836107#comment-17836107
 ] 

Gyula Fora commented on FLINK-34704:


So restricting the optimisation to the head of the operator chain is somewhat 
restricting but still the improvement in this particular scenario is actually 
huge and this may make or break some specialised use-cases so probably still 
worth considering after FLINK-35051

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836106#comment-17836106
 ] 

Gyula Fora commented on FLINK-34704:


I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]

2024-04-11 Thread via GitHub


Jiabao-Sun commented on code in PR #3221:
URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560798123


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java:
##
@@ -108,7 +110,32 @@ public void execute(Context context) throws Exception {
 this.taskRunning = true;
 try {
 while (taskRunning) {
-Optional next = 
Optional.ofNullable(changeStreamCursor.tryNext());
+Optional next;
+try {
+next = Optional.ofNullable(changeStreamCursor.tryNext());
+} catch (MongoQueryException e) {
+if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) {
+ChangeStreamOffset offset =
+new 
ChangeStreamOffset(streamSplit.getStartingOffset().getOffset());

Review Comment:
   Hi @yuxiqian, the `MongoCommandException` seems can be thrown by 
openChangeStreamCursor.
   
   ```
   at 
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:121)
   at 
com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:237)
   ```
   
   
![image](https://github.com/apache/flink-cdc/assets/27403841/829b5668-1cbf-4bfe-a324-a05c53e8d2eb)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]

2024-04-11 Thread via GitHub


slfan1989 commented on PR #24652:
URL: https://github.com/apache/flink/pull/24652#issuecomment-2049386534

   > Thanks @slfan1989 for the quick review!
   > 
   > > @1996fanrui If we confirm that it's an issue with commons-io, couldn't 
we resolve it by upgrading commons-io instead?
   > 
   > Do you mean upgrading commons-io to the latest version? If yes, I tried 
upgrading `commons-io` to `2.16.1`, it doesn't work.
   > 
   > I saw `2.16.1` is the latest version of `commons-io`[1].
   > 
   > [1] https://mvnrepository.com/artifact/commons-io/commons-io
   
   I'm sorry for any inconvenience caused. The reason for upgrading commons-io 
is that common-compress requires a higher version. We encounter a "class not 
found" issue when using commons-io 2.11. If we can't meet the performance 
requirements, I think we might have to revert FLINK-34955.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-11 Thread Shilun Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836099#comment-17836099
 ] 

Shilun Fan commented on FLINK-35040:


[~fanrui] During the compilation process, we found that commons-compress 
requires a higher version of commons-io, otherwise there will be a class not 
found exception.

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerHeavyString&extr=on&quarts=on&equid=off&env=3&revs=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]

2024-04-11 Thread via GitHub


1996fanrui commented on PR #24652:
URL: https://github.com/apache/flink/pull/24652#issuecomment-2049377985

   Thanks @slfan1989 for the quick review!
   
   > @1996fanrui If we confirm that it's an issue with commons-io, couldn't we 
resolve it by upgrading commons-io instead?
   
   Do you mean upgrading commons-io  to the latest version? If yes, I tried 
upgrading `commons-io` to `2.16.1`, it doesn't work. 
   
   I saw `2.16.1` is the latest version of `commons-io`[1].
   
   [1] https://mvnrepository.com/artifact/commons-io/commons-io


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]

2024-04-11 Thread via GitHub


yuxiqian commented on code in PR #3221:
URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560784586


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java:
##
@@ -108,7 +110,32 @@ public void execute(Context context) throws Exception {
 this.taskRunning = true;
 try {
 while (taskRunning) {
-Optional next = 
Optional.ofNullable(changeStreamCursor.tryNext());
+Optional next;
+try {
+next = Optional.ofNullable(changeStreamCursor.tryNext());
+} catch (MongoQueryException e) {
+if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) {
+ChangeStreamOffset offset =
+new 
ChangeStreamOffset(streamSplit.getStartingOffset().getOffset());

Review Comment:
   Yes, it would be much cleaner to move that checking logic to 
`openChangeStreamCursor` and implement the fallback logic along with existing 
routes.
   
   CMIIW but seems `MongoCommandException` will only be thrown [when getNext or 
tryGetNext is 
called](https://github.com/mongodb/mongo/blob/a71feef352a1f08e3f9dbc2b840691e75775a370/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp#L133C31-L133C83),
 and we can't intentionally trigger an `MongoCommandException` by calling other 
methods like `hasNext` earlier.
   
   @Jiabao-Sun Any suggestions about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35079] Fallback to timestamp startup mode when resume token has expired [flink-cdc]

2024-04-11 Thread via GitHub


yuxiqian commented on code in PR #3221:
URL: https://github.com/apache/flink-cdc/pull/3221#discussion_r1560784586


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java:
##
@@ -108,7 +110,32 @@ public void execute(Context context) throws Exception {
 this.taskRunning = true;
 try {
 while (taskRunning) {
-Optional next = 
Optional.ofNullable(changeStreamCursor.tryNext());
+Optional next;
+try {
+next = Optional.ofNullable(changeStreamCursor.tryNext());
+} catch (MongoQueryException e) {
+if (e.getErrorCode() == CHANGE_STREAM_FATAL_ERROR) {
+ChangeStreamOffset offset =
+new 
ChangeStreamOffset(streamSplit.getStartingOffset().getOffset());

Review Comment:
   Yes, it would be much cleaner to move that checking logic to 
`openChangeStreamCursor` and implement the fallback logic along with existing 
routes.
   
   CMIIW but seems `MongoCommandException` will only be thrown [when getNext or 
tryGetNext is 
called](https://github.com/mongodb/mongo/blob/a71feef352a1f08e3f9dbc2b840691e75775a370/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp#L133C31-L133C83),
 and we can't intentionally trigger an `MongoCommandException` by calling other 
methods like `hasNext` earlier.
   
   @Jiabao-Sun Any suggestions about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-11 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836091#comment-17836091
 ] 

Rui Fan edited comment on FLINK-35040 at 4/11/24 10:16 AM:
---

Hi [~slfan1989] , thanks for your quick feedback!

FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades 
the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the 
performance is recovered.

My question is why do you upgrade the commons-io in FLINK-34955, and I didn't 
see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0?

 

Note: I revert {{commons-io}} to 2.11.0, and upgrade commons-compress to 
2.16.1, then run the benchmark once, the performance is recovered.

I try to only revert {{commons-io}} to 2.11.0(See the PR), trigger benchmark 
twice, and see the performance result later. (The benchmark server is busy, so 
the result may be finished tomorrow.)


was (Author: fanrui):
Hi [~slfan1989] , thanks for your quick feedback!

FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades 
the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the 
performance is recovered. 

My question is why do you upgrade the commons-io in FLINK-34955, and I didn't 
see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0?

 

Note: I revert {{commons-io}} to 2.11.0, and upgrade commons-compress to 
2.16.1, then run the benchmark once, the performance is recovered.

I try to only revert {{commons-io}} to 2.11.0, trigger benchmark twice, and see 
the performance result later. (The benchmark server is busy, so the result may 
be finished tomorrow.)

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerHeavyString&extr=on&quarts=on&equid=off&env=3&revs=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]

2024-04-11 Thread via GitHub


slfan1989 commented on PR #24652:
URL: https://github.com/apache/flink/pull/24652#issuecomment-2049370922

   @1996fanrui If we confirm that it's an issue with commons-io, couldn't we 
resolve it by upgrading commons-io instead?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-11 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836091#comment-17836091
 ] 

Rui Fan edited comment on FLINK-35040 at 4/11/24 10:15 AM:
---

Hi [~slfan1989] , thanks for your quick feedback!

FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades 
the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the 
performance is recovered. 

My question is why do you upgrade the commons-io in FLINK-34955, and I didn't 
see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0?

 

Note: I revert {{commons-io}} to 2.11.0, and upgrade commons-compress to 
2.16.1, then run the benchmark once, the performance is recovered.

I try to only revert {{commons-io}} to 2.11.0, trigger benchmark twice, and see 
the performance result later. (The benchmark server is busy, so the result may 
be finished tomorrow.)


was (Author: fanrui):
Hi [~slfan1989] , thanks for your quick feedback!

FLINK-34955 wants to fix CVE issues of {{{}common-compress{}}}, but it upgrades 
the {{commons-io}} together. I try to revert {{commons-io}} to 2.11.0, and the 
performance is recovered.

My question is why do you upgrade the commons-io in FLINK-34955, and I didn't 
see any vulnerabilities for commons-io. Could I revert {{commons-io}} to 2.11.0?

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-08-10-51-07-403.png, 
> image-2024-04-11-12-53-53-353.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seems Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerHeavyString&extr=on&quarts=on&equid=off&env=3&revs=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >