[jira] [Updated] (FLINK-32631) FlinkSessionJob stuck in Created/Reconciling state because of No Job found error in JobManager

2024-01-04 Thread Bhupendra Yadav (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhupendra Yadav updated FLINK-32631:

Description: 
{*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session 
cluster and flink kubernetes operator 1.5.0.

{*}Bug{*}: We frequently encounter a problem where the job gets stuck in 
CREATED/RECONCILING state. On checking flink operator logs we see the error 
{_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK].
 # When a Flink session job is submitted, the Flink operator submits the job to 
the Flink Cluster.
 # Assume the job is finished(or reached a terminal state) and the job manager 
(JM) restarts for some reason, the job will no longer exist in the JM.
 # Upon reconciliation, the Flink operator queries the JM's REST API for the 
job using its jobID, but it receives a 404 error, indicating that the job is 
not found.
 # The operator then encounters an error and logs it, leading to the job 
getting stuck in an indefinite state.
 # Attempting to restart or suspend the job using the operator's provided 
mechanisms also fails because the operator keeps calling the REST API and 
receiving the same 404 error.

{*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and 
finds that it no longer exists in the Flink Cluster, it should handle the 
situation gracefully. Instead of getting stuck and logging errors indefinitely, 
the operator should mark the job as failed or deleted, or set an appropriate 
status for it.

  was:
{*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session 
cluster and flink kubernetes operator 1.5.0.

{*}Bug{*}: We frequently encounter a problem where the job gets stuck in 
CREATED/RECONCILING state. On checking flink operator logs we see the error 
{_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK].
 # When a Flink session job is submitted, the Flink operator submits the job to 
the Flink Cluster.
 # If the Flink job manager (JM) restarts for some reason, the job may no 
longer exist in the JM.
 # Upon reconciliation, the Flink operator queries the JM's REST API for the 
job using its jobID, but it receives a 404 error, indicating that the job is 
not found.
 # The operator then encounters an error and logs it, leading to the job 
getting stuck in an indefinite state.
 # Attempting to restart or suspend the job using the operator's provided 
mechanisms also fails because the operator keeps calling the REST API and 
receiving the same 404 error.

{*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and 
finds that it no longer exists in the Flink Cluster, it should handle the 
situation gracefully. Instead of getting stuck and logging errors indefinitely, 
the operator should mark the job as failed or deleted, or set an appropriate 
status for it.


> FlinkSessionJob stuck in Created/Reconciling state because of No Job found 
> error in JobManager
> --
>
> Key: FLINK-32631
> URL: https://issues.apache.org/jira/browse/FLINK-32631
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.16.0
> Environment: Local
>Reporter: Bhupendra Yadav
>Priority: Major
>
> {*}Background{*}: We are using FlinkSessionJob for submitting jobs to a 
> session cluster and flink kubernetes operator 1.5.0.
> {*}Bug{*}: We frequently encounter a problem where the job gets stuck in 
> CREATED/RECONCILING state. On checking flink operator logs we see the error 
> {_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK].
>  # When a Flink session job is submitted, the Flink operator submits the job 
> to the Flink Cluster.
>  # Assume the job is finished(or reached a terminal state) and the job 
> manager (JM) restarts for some reason, the job will no longer exist in the JM.
>  # Upon reconciliation, the Flink operator queries the JM's REST API for the 
> job using its jobID, but it receives a 404 error, indicating that the job is 
> not found.
>  # The operator then encounters an error and logs it, leading to the job 
> getting stuck in an indefinite state.
>  # Attempting to restart or suspend the job using the operator's provided 
> mechanisms also fails because the operator keeps calling the REST API and 
> receiving the same 404 error.
> {*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job 
> and finds that it no longer exists in the Flink Cluster, it should handle the 
> situation gracefully. Instead of getting stuck and logging errors 
> indefinitely, the operator should mark the job as failed or deleted, or set 
> an appropriate status for it.



--
This message was sent by Atlassian Jira

Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-04 Thread via GitHub


1996fanrui commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1442527329


##
flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java:
##
@@ -228,6 +229,21 @@ public static List> 
parseCachedFilesFromSt
 .collect(Collectors.toList());
 }
 
+@Internal
+public static List parseStringFromCachedFiles(

Review Comment:
   Would you mind adding a test to check `parseStringFromCachedFiles` and 
`parseCachedFilesFromString` can be converted each other?



##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -1201,6 +1212,9 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
 .getOptional(ExecutionOptions.SNAPSHOT_COMPRESSION)
 .ifPresent(this::setUseSnapshotCompression);
 
RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);
+configuration
+.getOptional(RestartStrategyOptions.RESTART_STRATEGY)
+.ifPresent(s -> this.setRestartStrategy(configuration));

Review Comment:
   nit:  Why don't `this.setRestartStrategy(
   new RestartStrategies
   
.FallbackRestartStrategyConfiguration())` here?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java:
##
@@ -614,7 +563,9 @@ private Collection 
legacyTransform(Transformation transform) {
 if (transform.getBufferTimeout() >= 0) {
 streamGraph.setBufferTimeout(transform.getId(), 
transform.getBufferTimeout());
 } else {
-streamGraph.setBufferTimeout(transform.getId(), 
defaultBufferTimeout);
+streamGraph.setBufferTimeout(
+transform.getId(),
+
configuration.get(ExecutionOptions.BUFFER_TIMEOUT).toMillis());

Review Comment:
   Should we consider `BUFFER_TIMEOUT_ENABLED` here?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java:
##
@@ -385,7 +335,9 @@ private void configureStreamGraph(final StreamGraph graph) {
 
 if (shouldExecuteInBatchMode) {
 configureStreamGraphBatch(graph);
-setDefaultBufferTimeout(-1);
+configuration.set(
+ExecutionOptions.BUFFER_TIMEOUT,
+
Duration.ofMillis(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT));

Review Comment:
   If we consider `BUFFER_TIMEOUT_ENABLED` when getBufferTimeout, it's better 
to set `BUFFER_TIMEOUT_ENABLED` to false here.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -453,7 +440,10 @@ public StreamExecutionEnvironment setBufferTimeout(long 
timeoutMillis) {
 throw new IllegalArgumentException("Timeout of buffer must be 
non-negative or -1");
 }
 
-this.bufferTimeout = timeoutMillis;
+if (timeoutMillis == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) 
{
+this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, 
false);
+}
+this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT, 
Duration.ofMillis(timeoutMillis));
 return this;

Review Comment:
   I'm curious could we deprecate `setBufferTimeout` and `getBufferTimeout` and 
remove them in 2.0?
   
   I didn't notice any benefit for them, and `DISABLED_NETWORK_BUFFER_TIMEOUT` 
is more clear for users than -1. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-04 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803426#comment-17803426
 ] 

xuyang commented on FLINK-33996:


This is a great idea! When I saw this Jira, another idea popped into my head:

During the plan optimization phase, introduce a dedicated rule to extract 
reusable expressions within a calc and eventually transform it into two calc 
nodes. This maybe help us deal with the structure of three-level calcs.

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row type [flink]

2024-01-04 Thread via GitHub


flinkbot commented on PR #24029:
URL: https://github.com/apache/flink/pull/24029#issuecomment-1878195014

   
   ## CI report:
   
   * 5e614717b3d19a566939cd2dca8ae414c37f765f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33759] [flink-parquet] Add support for nested array with row type [flink]

2024-01-04 Thread via GitHub


ukby1234 commented on PR #24029:
URL: https://github.com/apache/flink/pull/24029#issuecomment-1878191452

   This adds the missing case for https://github.com/apache/flink/pull/23881


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33759] [flink-parquet] Add support for nested array with row type [flink]

2024-01-04 Thread via GitHub


ukby1234 opened a new pull request, #24029:
URL: https://github.com/apache/flink/pull/24029

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   This is to support parquet file writes with nested array with struct type. 
   
   ## Brief change log
   - *add support for nested array with row type*
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
 - *Added test that validates that nested array with row type can be 
written*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33938) Correct implicit coercions in relational operators to adopt typescript 5.0

2024-01-04 Thread Ao Yuchen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803421#comment-17803421
 ] 

Ao Yuchen commented on FLINK-33938:
---

Hi [~Wencong Liu] , I had created a PR to solve this issue, would you like to 
take a look?

> Correct implicit coercions in relational operators to adopt typescript 5.0
> --
>
> Key: FLINK-33938
> URL: https://issues.apache.org/jira/browse/FLINK-33938
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Ao Yuchen
>Assignee: Ao Yuchen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> Since TypeScript 5.0, there is a break change that implicit coercions in 
> relational operators are forbidden [1].
> So that the following code in 
> flink-runtime-web/web-dashboard/src/app/components/humanize-date.pipe.ts get 
> error:
> {code:java}
> public transform(
>   value: number | string | Date,
>   ...
> ): string | null | undefined {
>   if (value == null || value === '' || value !== value || value < 0) {
> return '-';
>   } 
>   ...
> }{code}
> The correctness improvement is availble in here 
> [2][.|https://github.com/microsoft/TypeScript/pull/52048.]
> I think we should optimize this type of code for better compatibility.
>  
> [1] 
> [https://devblogs.microsoft.com/typescript/announcing-typescript-5-0/#forbidden-implicit-coercions-in-relational-operators]
> [2] 
> [https://github.com/microsoft/TypeScript/pull/52048|https://github.com/microsoft/TypeScript/pull/52048.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33938) Correct implicit coercions in relational operators to adopt typescript 5.0

2024-01-04 Thread Ao Yuchen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ao Yuchen updated FLINK-33938:
--
Fix Version/s: 1.17.3
   1.18.2

> Correct implicit coercions in relational operators to adopt typescript 5.0
> --
>
> Key: FLINK-33938
> URL: https://issues.apache.org/jira/browse/FLINK-33938
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Ao Yuchen
>Assignee: Ao Yuchen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> Since TypeScript 5.0, there is a break change that implicit coercions in 
> relational operators are forbidden [1].
> So that the following code in 
> flink-runtime-web/web-dashboard/src/app/components/humanize-date.pipe.ts get 
> error:
> {code:java}
> public transform(
>   value: number | string | Date,
>   ...
> ): string | null | undefined {
>   if (value == null || value === '' || value !== value || value < 0) {
> return '-';
>   } 
>   ...
> }{code}
> The correctness improvement is availble in here 
> [2][.|https://github.com/microsoft/TypeScript/pull/52048.]
> I think we should optimize this type of code for better compatibility.
>  
> [1] 
> [https://devblogs.microsoft.com/typescript/announcing-typescript-5-0/#forbidden-implicit-coercions-in-relational-operators]
> [2] 
> [https://github.com/microsoft/TypeScript/pull/52048|https://github.com/microsoft/TypeScript/pull/52048.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-04 Thread via GitHub


JunRuiLee commented on PR #24025:
URL: https://github.com/apache/flink/pull/24025#issuecomment-1878160985

   > Thanks @JunRuiLee for the contribution!
   > 
   > [FLINK-33935](https://issues.apache.org/jira/browse/FLINK-33935) improves 
2 options related to statebackend and checkpoint. It is merged,and it has a 
little conflicts with this PR, please address it in your free time, thanks~
   
   Thanks @1996fanrui for the kind reminder. I have rebased this pull request 
and resolved the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33634) Add Conditions to Flink CRD's Status field

2024-01-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33634:
---
Labels: pull-request-available  (was: )

> Add Conditions to Flink CRD's Status field
> --
>
> Key: FLINK-33634
> URL: https://issues.apache.org/jira/browse/FLINK-33634
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Major
>  Labels: pull-request-available
>
> From 
> [https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties]
>  it is considered best practice to provide Conditions in the Status of CRD's. 
> Some tooling even expects there to be a Conditions field in the status of a 
> CR. This issue to to propose adding a Conditions field to the CR status
> e.g.
> status:
>     conditions:
>      - lastTransitionTime: '2023-11-23T12:38:51Z'
>        status: 'True'
>        type: Ready



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33634] Add Conditions to Flink CRD's Status field [flink-kubernetes-operator]

2024-01-04 Thread via GitHub


lajith2006 opened a new pull request, #749:
URL: https://github.com/apache/flink-kubernetes-operator/pull/749

   
   
   ## What is the purpose of the change
   
   This PR is for issue https://issues.apache.org/jira/browse/FLINK-33634 to 
add Conditions field in the CR status of Flink Deployment and FlinkSessionJob. 
   
   
   ## Brief change log
   
 - Created a new class CommonCRStatus to build different Conditions with 
type and status. This reusable class can be used to build CR condition of type 
Ready or Error conditions based on jobmanager status and job status.
 - FlinkDeploymentStatus is updated to add list of Condition
 - FlinkDeploymentController is updated to add Conditions to 
FlinkDeploymentStatus based on JobManagerDeploymentStatus. Following 
JobManagerDeploymentStatus are considerd to 
add conditions status to CR. 
 READY
 DEPLOYED_NOT_READY
 DEPLOYING
 ERROR
   - FlinkSessionJobStatus is updated to add list of Condition
   - FlinkSessionJobController is updated to add Conditions to 
FlinkSessionJobStatus based on JobStatus. Following JobStatus are considered to 
add conditions status to CR 
   RUNNING
   CREATED
   CANCELED
   FAILED
   - FlinkDeploymentControllerTest and FlinkSessionJobControllerTest are 
modified to test the Status update with conditions.
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as 
FlinkDeploymentControllerTest and FlinkSessionJobControllerTest
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no) yes
 - Core observer or reconciler logic that is regularly executed: (yes / no) 
yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) yes
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented) docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-04 Thread Feng Jin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jin updated FLINK-33996:
-
Summary: Support disabling project rewrite when multiple exprs in the 
project reference the same sub project field.  (was: Support disabling project 
rewrite when multiple exprs in the project reference the same project.)

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33936) The aggregation of mini-batches should output the result even if the result is the same as before when TTL is configured.

2024-01-04 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803409#comment-17803409
 ] 

Feng Jin commented on FLINK-33936:
--

[~xuyangzhong]  Thank you for your reply, I would like to fix this issue. 

> The aggregation of mini-batches should output the result even if the result 
> is the same as before when TTL is configured.
> -
>
> Key: FLINK-33936
> URL: https://issues.apache.org/jira/browse/FLINK-33936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>
> If mini-batch is enabled currently, and if the aggregated result is the same 
> as the previous output, this current aggregation result will not be sent 
> downstream.  This will cause downstream nodes to not receive updated data. If 
> there is a TTL set for states at this time, the TTL of downstream will not be 
> updated either.
> The specific logic is as follows.
> https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
> if (!equaliser.equals(prevAggValue, newAggValue)) {
> // new row is not same with prev row
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> out.collect(resultRow);
> }
> // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time 
> is the same as last time, new results will still be sent if TTL is set.
> https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
> if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
> newAggValue)) {
> // newRow is the same as before and state cleaning is not 
> enabled.
> // We do not emit retraction and acc message.
> // If state cleaning is enabled, we have to emit messages 
> to prevent too early
> // state eviction of downstream operators.
> return;
> } else {
> // retract previous result
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when 
> mini-batch aggregation is enabled, new results should also output when the 
> aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33897) Allow triggering unaligned checkpoint via CLI

2024-01-04 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803407#comment-17803407
 ] 

Zakelly Lan commented on FLINK-33897:
-

[~pnowojski]  Actually there is real world motivation. When a job encountered 
high back-pressure and after dozens of minutes of aligned checkpointing without 
success, the user finds that they need to switch to unaligned cp or enlarge the 
parallelism. Such change requires a job restart, which puts users in a dilemma 
because this involves replaying much data and a longer delay. This feature 
allows users to make an unaligned cp temporarily and restart from it, 
preventing from the large data replay.

I do agree we could enable timeout for aligned cp by default, which greatly 
reduce this case. And I also think there would be value giving user a chance to 
change the configuration and restart the job with less pain when they 
misconfigured their jobs, by supporting triggering a swift and promising 
checkpoint or savepoint. As for the complication supporting this feature, IIUC, 
some changes should apply to the handler states (may introduce a new 
{{{}BarrierHandlerState{}}}) and less change will make to the 
{{SingleCheckpointBarrierHandler}} itself. I'm not very familiar with this part 
so if you think this is a big change, I won't insist on doing it.

> Allow triggering unaligned checkpoint via CLI
> -
>
> Key: FLINK-33897
> URL: https://issues.apache.org/jira/browse/FLINK-33897
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> After FLINK-6755, user could trigger checkpoint through CLI. However I 
> noticed there would be value supporting trigger it in unaligned way, since 
> the job may encounter a high back-pressure and an aligned checkpoint would 
> fail.
>  
> I suggest we provide an option '-unaligned' in CLI to support that.
>  
> Similar option would also be useful for REST api



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34000] Implement restore tests for IncrementalGroupAgg node [flink]

2024-01-04 Thread via GitHub


flinkbot commented on PR #24028:
URL: https://github.com/apache/flink/pull/24028#issuecomment-1878101061

   
   ## CI report:
   
   * 44ff132411251b97117799a727005c87bc4e7a1d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34000) Implement restore tests for IncrementalGroupAggregate node

2024-01-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34000:
---
Labels: pull-request-available  (was: )

> Implement restore tests for IncrementalGroupAggregate node
> --
>
> Key: FLINK-34000
> URL: https://issues.apache.org/jira/browse/FLINK-34000
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Bonnie Varghese
>Assignee: Bonnie Varghese
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34000] Implement restore tests for IncrementalGroupAgg node [flink]

2024-01-04 Thread via GitHub


bvarghese1 opened a new pull request, #24028:
URL: https://github.com/apache/flink/pull/24028

   
   
   ## What is the purpose of the change
   
   *Add restore tests for IncrementalGroupAggregate node*
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
   - Added restore tests for IncrementalGroupAggregate node which verifies the 
generated compiled plan with the saved compiled plan.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34000) Implement restore tests for IncrementalGroupAggregate node

2024-01-04 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-34000:
---

 Summary: Implement restore tests for IncrementalGroupAggregate node
 Key: FLINK-34000
 URL: https://issues.apache.org/jira/browse/FLINK-34000
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Bonnie Varghese
Assignee: Bonnie Varghese






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes [flink]

2024-01-04 Thread via GitHub


1996fanrui commented on code in PR #22984:
URL: https://github.com/apache/flink/pull/22984#discussion_r1442471228


##
flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java:
##
@@ -72,9 +72,23 @@ public static RestartStrategyConfiguration fixedDelayRestart(
  * @param restartAttempts Number of restart attempts for the 
FixedDelayRestartStrategy
  * @param delayInterval Delay in-between restart attempts for the 
FixedDelayRestartStrategy
  * @return FixedDelayRestartStrategy
+ * @deprecated Use {@link #fixedDelayRestart(int, Duration)}
  */
+@Deprecated

Review Comment:
   You are right, this change is useful if FLIP-381 is reverted. But it's very 
unlikely. And these changes can be done when it happens.
   
   So far, this change isn't required, and I'm fine to have it. So up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33936) The aggregation of mini-batches should output the result even if the result is the same as before when TTL is configured.

2024-01-04 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803395#comment-17803395
 ] 

xuyang commented on FLINK-33936:


Thanks for your report. This bug seems to be that when optimizations about 
mini-batch agg were first introduced, some behaviors in the group agg function 
were not aligned. I think we need to fix it.

> The aggregation of mini-batches should output the result even if the result 
> is the same as before when TTL is configured.
> -
>
> Key: FLINK-33936
> URL: https://issues.apache.org/jira/browse/FLINK-33936
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>
> If mini-batch is enabled currently, and if the aggregated result is the same 
> as the previous output, this current aggregation result will not be sent 
> downstream.  This will cause downstream nodes to not receive updated data. If 
> there is a TTL set for states at this time, the TTL of downstream will not be 
> updated either.
> The specific logic is as follows.
> https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
> if (!equaliser.equals(prevAggValue, newAggValue)) {
> // new row is not same with prev row
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> out.collect(resultRow);
> }
> // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time 
> is the same as last time, new results will still be sent if TTL is set.
> https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
> if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, 
> newAggValue)) {
> // newRow is the same as before and state cleaning is not 
> enabled.
> // We do not emit retraction and acc message.
> // If state cleaning is enabled, we have to emit messages 
> to prevent too early
> // state eviction of downstream operators.
> return;
> } else {
> // retract previous result
> if (generateUpdateBefore) {
> // prepare UPDATE_BEFORE message for previous row
> resultRow
> .replace(currentKey, prevAggValue)
> .setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow);
> }
> // prepare UPDATE_AFTER message for new row
> resultRow.replace(currentKey, 
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);
> }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when 
> mini-batch aggregation is enabled, new results should also output when the 
> aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33935) Improve the default value doc and logic for some state backend and checkpoint related options

2024-01-04 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33935.
-
Resolution: Fixed

> Improve the default value doc and logic for some state backend and checkpoint 
> related options
> -
>
> Key: FLINK-33935
> URL: https://issues.apache.org/jira/browse/FLINK-33935
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Some state backend and checkpoint related options don't set the default value 
> directly, but but they implement default value based on code. Such as:
>  * execution.checkpointing.tolerable-failed-checkpoints
>  ** 
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints]
>  * state.backend.type
>  ** 
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#state-backend-type]
> h2. Option1
> execution.checkpointing.tolerable-failed-checkpoints doesn't have default 
> value, but CheckpointConfig#getTolerableCheckpointFailureNumber calls  
> {color:#9876aa}configuration{color}.getOptional(ExecutionCheckpointingOptions.{color:#9876aa}TOLERABLE_FAILURE_NUMBER{color}).orElse({color:#6897bb}0{color}).
> It means the 0 is default value of 
> execution.checkpointing.tolerable-failed-checkpoints.
> h2. Option2
> state.backend.type does't have default value, but 
> StateBackendLoader#loadFromApplicationOrConfigOrDefaultInternal calls 
> loadStateBackendFromConfig(config{color:#cc7832}, 
> {color}classLoader{color:#cc7832}, {color}logger). When the return value is 
> null, Flink will consider the hashmap as the default state backend.
> I checked all callers of StateBackendLoader#loadStateBackendFromConfig, if we 
> change the default value of state.backend.type to hashmap. All of them work 
> well.
> h2. Why set the default value directly is fine?
> From user side, it's clearer.
> From flink developers or maintainers side, it's easy to maintain.
> h2. Proposed changes:
> Adding the default value for them:
>  * execution.checkpointing.tolerable-failed-checkpoints: 0
>  * state.backend.type: hashmap
> Note: this JIAR adds the default value, but the behaviour is absolutely same 
> with old one, so it doesn't introduce any effect for users. (So the FLIP 
> isn't necessary IIUC.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33935) Improve the default value doc and logic for some state backend and checkpoint related options

2024-01-04 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803394#comment-17803394
 ] 

Rui Fan commented on FLINK-33935:
-

Merged to master(1.19) via: 4a852fee28f2d87529dc05f5ba2e79202a0e00b6 and 
cf0ac00cfe84c34af64308cebfc4a9034b94fddc

> Improve the default value doc and logic for some state backend and checkpoint 
> related options
> -
>
> Key: FLINK-33935
> URL: https://issues.apache.org/jira/browse/FLINK-33935
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Some state backend and checkpoint related options don't set the default value 
> directly, but but they implement default value based on code. Such as:
>  * execution.checkpointing.tolerable-failed-checkpoints
>  ** 
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints]
>  * state.backend.type
>  ** 
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#state-backend-type]
> h2. Option1
> execution.checkpointing.tolerable-failed-checkpoints doesn't have default 
> value, but CheckpointConfig#getTolerableCheckpointFailureNumber calls  
> {color:#9876aa}configuration{color}.getOptional(ExecutionCheckpointingOptions.{color:#9876aa}TOLERABLE_FAILURE_NUMBER{color}).orElse({color:#6897bb}0{color}).
> It means the 0 is default value of 
> execution.checkpointing.tolerable-failed-checkpoints.
> h2. Option2
> state.backend.type does't have default value, but 
> StateBackendLoader#loadFromApplicationOrConfigOrDefaultInternal calls 
> loadStateBackendFromConfig(config{color:#cc7832}, 
> {color}classLoader{color:#cc7832}, {color}logger). When the return value is 
> null, Flink will consider the hashmap as the default state backend.
> I checked all callers of StateBackendLoader#loadStateBackendFromConfig, if we 
> change the default value of state.backend.type to hashmap. All of them work 
> well.
> h2. Why set the default value directly is fine?
> From user side, it's clearer.
> From flink developers or maintainers side, it's easy to maintain.
> h2. Proposed changes:
> Adding the default value for them:
>  * execution.checkpointing.tolerable-failed-checkpoints: 0
>  * state.backend.type: hashmap
> Note: this JIAR adds the default value, but the behaviour is absolutely same 
> with old one, so it doesn't introduce any effect for users. (So the FLIP 
> isn't necessary IIUC.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33935][checkpoint] Improve the default value logic related to `execution.checkpointing.tolerable-failed-checkpoints` and `state.backend.type` [flink]

2024-01-04 Thread via GitHub


1996fanrui merged PR #23987:
URL: https://github.com/apache/flink/pull/23987


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33999) FLIP-412: Add the time-consuming span of each stage when starting the Flink job to TraceReporter

2024-01-04 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33999:

Description: This is an umbrella Jira of  [FLIP-412| 
https://cwiki.apache.org/confluence/x/8435E]

> FLIP-412: Add the time-consuming span of each stage when starting the Flink 
> job to TraceReporter
> 
>
> Key: FLINK-33999
> URL: https://issues.apache.org/jira/browse/FLINK-33999
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination, Runtime / Metrics
>Reporter: Rui Fan
>Assignee: junzhong qin
>Priority: Major
>
> This is an umbrella Jira of  [FLIP-412| 
> https://cwiki.apache.org/confluence/x/8435E]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-04 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803392#comment-17803392
 ] 

Rui Fan commented on FLINK-33856:
-

Thanks [~pnowojski] for the ping.:)
{quote} [~fanrui] if I remember correctly you wanted to follow up on this?
{quote}
As I said in the mail list, I propose adding a series of TraceSpan for job 
start, such as:
 * From JobManager process is started to JobGraph is created
 * From JobGraph is created to JobMaster is created
 * From JobMaster is created to job is running
 * From start request tm from yarn or kubernetes to all tms are ready 
 * etc

And I and [~easonqin]  who my colleague created the FLIP-412: Add the 
time-consuming span of each stage when starting the Flink job to 
TraceReporter[2][3] just now to follow up it. IIUC, it's not related to this 
JIRA, right? They add the Span for different stage.

 

[1][https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4]

[2][https://cwiki.apache.org/confluence/x/8435E]

[3]https://issues.apache.org/jira/browse/FLINK-33999

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33740) Add a document to list the supported sql patterns

2024-01-04 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-33740:
---
Summary: Add a document to list the supported sql patterns  (was: Introduce 
a flip to list the supported sql patterns)

> Add a document to list the supported sql patterns
> -
>
> Key: FLINK-33740
> URL: https://issues.apache.org/jira/browse/FLINK-33740
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
>
> Introduce a flip with aligning sql usages to list all patterns we support and 
> do not support. 
> See more details in https://issues.apache.org/jira/browse/FLINK-33490



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33999) FLIP-412: Add the time-consuming span of each stage when starting the Flink job to TraceReporter

2024-01-04 Thread Rui Fan (Jira)
Rui Fan created FLINK-33999:
---

 Summary: FLIP-412: Add the time-consuming span of each stage when 
starting the Flink job to TraceReporter
 Key: FLINK-33999
 URL: https://issues.apache.org/jira/browse/FLINK-33999
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination, Runtime / Metrics
Reporter: Rui Fan
Assignee: junzhong qin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33817) Allow ReadDefaultValues = False for non primitive types on Proto3

2024-01-04 Thread Sai Sharath Dandi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803389#comment-17803389
 ] 

Sai Sharath Dandi commented on FLINK-33817:
---

[~maosuhan] Gentle ping on this.

> Allow ReadDefaultValues = False for non primitive types on Proto3
> -
>
> Key: FLINK-33817
> URL: https://issues.apache.org/jira/browse/FLINK-33817
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Priority: Major
>
> *Background*
>  
> The current Protobuf format 
> [implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java]
>  always sets ReadDefaultValues=False when using Proto3 version. This can 
> cause severe performance degradation for large Protobuf schemas with OneOf 
> fields as the entire generated code needs to be executed during 
> deserialization even when certain fields are not present in the data to be 
> deserialized and all the subsequent nested Fields can be skipped. Proto3 
> supports hasXXX() methods for checking field presence for non primitive types 
> since Proto version 
> [3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In 
> the internal performance benchmarks in our company, we've seen almost 10x 
> difference in performance for one of our real production usecase when 
> allowing to set ReadDefaultValues=False with proto3 version. The exact 
> difference in performance depends on the schema complexity and data payload 
> but we should allow user to set readDefaultValue=False in general.
>  
> *Solution*
>  
> Support using ReadDefaultValues=False when using Proto3 version. We need to 
> be careful to check for field presence only on non-primitive types if 
> ReadDefaultValues is false and version used is Proto3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33611) Support Large Protobuf Schemas

2024-01-04 Thread Sai Sharath Dandi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803388#comment-17803388
 ] 

Sai Sharath Dandi edited comment on FLINK-33611 at 1/5/24 3:03 AM:
---

[~libenchao] , Thanks for the suggestion to add the test case to the pull 
request. While working on making an appropriate test case, I discovered 
something interesting. The local variable names are somehow not part of the 
Java constant pool for large schemas. I believe Java is triggering some 
optimizations internally and storing the variable names elsewhere when the code 
size becomes too large, therefore I'm not sure if reusing variable names has 
any impact on supporting large schemas. Perhaps, it can reduce the work needed 
for the Java compiler to rewrite variable names and result in faster compile 
times but I haven't conducted any experiment on that aspect. Apart from that, 
making the code change to reduce too many split methods has the most impact in 
supporting large schemas as I found that method names are always included in 
the constant pool even when the code size is too large from my experiment. In 
fact, this is the main reason which causes compilation errors with "too many 
constants error"

With that being said, I would still prefer to keep the changes to reuse 
variable names since the change itself is non-intrusive, harmless, and can only 
improve the performance for compilation. Please let me know your thoughts


was (Author: JIRAUSER298466):
@libenchao, Thanks for the suggestion to add the test case to the pull request. 
While working on making an appropriate test case, I discovered something 
interesting. The local variable names are somehow not part of the Java constant 
pool for large schemas. I believe Java is triggering some optimizations 
internally and storing the variable names elsewhere when the code size becomes 
too large, therefore I'm not sure if reusing variable names has any impact on 
supporting large schemas. Perhaps, it can reduce the work needed for the Java 
compiler to rewrite variable names and result in faster compile times but I 
haven't conducted any experiment on that aspect. Apart from that, making the 
code change to reduce too many split methods has the most impact in supporting 
large schemas as I found that method names are always included in the constant 
pool even when the code size is too large from my experiment. In fact, this is 
the main reason which causes compilation errors with "too many constants error"

With that being said, I would still prefer to keep the changes to reuse 
variable names since the change itself is non-intrusive, harmless, and can only 
improve the performance for compilation. Please let me know your thoughts

> Support Large Protobuf Schemas
> --
>
> Key: FLINK-33611
> URL: https://issues.apache.org/jira/browse/FLINK-33611
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Assignee: Sai Sharath Dandi
>Priority: Major
>  Labels: pull-request-available
>
> h3. Background
> Flink serializes and deserializes protobuf format data by calling the decode 
> or encode method in GeneratedProtoToRow_XXX.java generated by codegen to 
> parse byte[] data into Protobuf Java objects. FLINK-32650 has introduced the 
> ability to split the generated code to improve the performance for large 
> Protobuf schemas. However, this is still not sufficient to support some 
> larger protobuf schemas as the generated code exceeds the java constant pool 
> size [limit|https://en.wikipedia.org/wiki/Java_class_file#The_constant_pool] 
> and we can see errors like "Too many constants" when trying to compile the 
> generated code. 
> *Solution*
> Since we already have the split code functionality already introduced, the 
> main proposal here is to now reuse the variable names across different split 
> method scopes. This will greatly reduce the constant pool size. One more 
> optimization is to only split the last code segment also only when the size 
> exceeds split threshold limit. Currently, the last segment of the generated 
> code is always being split which can lead to too many split methods and thus 
> exceed the constant pool size limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33611) Support Large Protobuf Schemas

2024-01-04 Thread Sai Sharath Dandi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803388#comment-17803388
 ] 

Sai Sharath Dandi commented on FLINK-33611:
---

@libenchao, Thanks for the suggestion to add the test case to the pull request. 
While working on making an appropriate test case, I discovered something 
interesting. The local variable names are somehow not part of the Java constant 
pool for large schemas. I believe Java is triggering some optimizations 
internally and storing the variable names elsewhere when the code size becomes 
too large, therefore I'm not sure if reusing variable names has any impact on 
supporting large schemas. Perhaps, it can reduce the work needed for the Java 
compiler to rewrite variable names and result in faster compile times but I 
haven't conducted any experiment on that aspect. Apart from that, making the 
code change to reduce too many split methods has the most impact in supporting 
large schemas as I found that method names are always included in the constant 
pool even when the code size is too large from my experiment. In fact, this is 
the main reason which causes compilation errors with "too many constants error"

With that being said, I would still prefer to keep the changes to reuse 
variable names since the change itself is non-intrusive, harmless, and can only 
improve the performance for compilation. Please let me know your thoughts

> Support Large Protobuf Schemas
> --
>
> Key: FLINK-33611
> URL: https://issues.apache.org/jira/browse/FLINK-33611
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Assignee: Sai Sharath Dandi
>Priority: Major
>  Labels: pull-request-available
>
> h3. Background
> Flink serializes and deserializes protobuf format data by calling the decode 
> or encode method in GeneratedProtoToRow_XXX.java generated by codegen to 
> parse byte[] data into Protobuf Java objects. FLINK-32650 has introduced the 
> ability to split the generated code to improve the performance for large 
> Protobuf schemas. However, this is still not sufficient to support some 
> larger protobuf schemas as the generated code exceeds the java constant pool 
> size [limit|https://en.wikipedia.org/wiki/Java_class_file#The_constant_pool] 
> and we can see errors like "Too many constants" when trying to compile the 
> generated code. 
> *Solution*
> Since we already have the split code functionality already introduced, the 
> main proposal here is to now reuse the variable names across different split 
> method scopes. This will greatly reduce the constant pool size. One more 
> optimization is to only split the last code segment also only when the size 
> exceeds split threshold limit. Currently, the last segment of the generated 
> code is always being split which can lead to too many split methods and thus 
> exceed the constant pool size limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803382#comment-17803382
 ] 

Zhongqiang Gong edited comment on FLINK-33970 at 1/5/24 2:48 AM:
-

[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*

_Because connector docs depends on flink main docs and no dependencies between 
connector docs , so we can just check flink main docs and single connector 
docs. Finally,We use hugo build to check docs._
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

_Because of hugo build cannot check dead link. It's very useful to add dead 
link check for this._

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link


was (Author: JIRAUSER301076):
[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*

_Because connector docs depends on flink main docs and no dependencies between 
connector docs , so we can just check flink main docs and single connector 
docs. Finally,We use hugo build to check docs._
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

Because of hugo build cannot check dead link. It's very useful to add dead link 
check for this.

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803382#comment-17803382
 ] 

Zhongqiang Gong edited comment on FLINK-33970 at 1/5/24 2:48 AM:
-

[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

 

*Hugo build check:*

_Because connector docs depends on flink main docs and no dependencies between 
connector docs , so we can just check flink main docs and single connector 
docs. Finally,We use hugo build to check docs._
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

 

*Dead links check:*

_Because of hugo build cannot check dead link. It's very useful to add dead 
link check for this._
 * use npm module *markdown-link-check* to check dead link


was (Author: JIRAUSER301076):
[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*

_Because connector docs depends on flink main docs and no dependencies between 
connector docs , so we can just check flink main docs and single connector 
docs. Finally,We use hugo build to check docs._
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

_Because of hugo build cannot check dead link. It's very useful to add dead 
link check for this._

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803382#comment-17803382
 ] 

Zhongqiang Gong edited comment on FLINK-33970 at 1/5/24 2:48 AM:
-

[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*

_Because connector docs depends on flink main docs and no dependencies between 
connector docs , so we can just check flink main docs and single connector 
docs. Finally,We use hugo build to check docs._
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

Because of hugo build cannot check dead link. It's very useful to add dead link 
check for this.

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link


was (Author: JIRAUSER301076):
[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*

{_}Because connector docs depends on flink main docs and no dependencies 
between connector docs , so we can just check flink main docs and single 
connector docs. Finally,We use hugo build to check docs.{_}{*}{*}
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

Because of hugo build cannot check dead link. It's very useful to add dead link 
check for this.

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link

 

 

 

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803382#comment-17803382
 ] 

Zhongqiang Gong edited comment on FLINK-33970 at 1/5/24 2:47 AM:
-

[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*

{_}Because connector docs depends on flink main docs and no dependencies 
between connector docs , so we can just check flink main docs and single 
connector docs. Finally,We use hugo build to check docs.{_}{*}{*}
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

Because of hugo build cannot check dead link. It's very useful to add dead link 
check for this.

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link

 

 

 


was (Author: JIRAUSER301076):
[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

Because of hugo build cannot check dead link. It's very useful to add dead link 
check for this.

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link

 

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803382#comment-17803382
 ] 

Zhongqiang Gong commented on FLINK-33970:
-

[~martijnvisser] [~leonard] Can you give me some advices for this issue?

I have a idea for this issue:

*Hugo build check:*
 * check out flink repo with only `docs` dir cloned ,use *sparse-checkout*
 * move connector (which repo related) docs into flink `docs` dir
 * execute hugo build command

Because of hugo build cannot check dead link. It's very useful to add dead link 
check for this.

*Dead links check:*
 * use npm module *markdown-link-check* to check dead link

 

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices

2024-01-04 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-33968.
---
Fix Version/s: 1.19.0
   Resolution: Done

master/1.19: b25dfaee80727d6662a5fd445fe51cc139a8b9eb

> Compute the number of subpartitions when initializing executon job vertices
> ---
>
> Key: FLINK-33968
> URL: https://issues.apache.org/jira/browse/FLINK-33968
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, when using dynamic graphs, the subpartition-num of a task is 
> lazily calculated until the task deployment moment, this may lead to some 
> uncertainties in job recovery scenarios:
> Before jm crashs, when deploying upstream tasks, the parallelism of 
> downstream vertex may be unknown, so the subpartiton-num will be the max 
> parallelism of downstream job vertex. However, after jm restarts, when 
> deploying upstream tasks, the parallelism of downstream job vertex may be 
> known(has been calculated before jm crashs and been recovered after jm 
> restarts), so the subpartiton-num will be the actual parallelism of 
> downstream job vertex. The difference of calculated subpartition-num will 
> lead to the partitions generated before jm crashs cannot be reused after jm 
> restarts.
> We will solve this problem by advancing the calculation of subpartitoin-num 
> to the moment of initializing executon job vertex (in ctor of 
> IntermediateResultPartition)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex [flink]

2024-01-04 Thread via GitHub


zhuzhurk closed pull request #24019: [FLINK-33968][runtime] Advance the 
calculation of num of subpartitions to the time of initializing execution job 
vertex
URL: https://github.com/apache/flink/pull/24019


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33319) Add AverageTime metric to measure delta change in GC time

2024-01-04 Thread ouyangwulin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803379#comment-17803379
 ] 

ouyangwulin commented on FLINK-33319:
-

[~gyfora]  Could you assign this ticket to me?

> Add AverageTime metric to measure delta change in GC time
> -
>
> Key: FLINK-33319
> URL: https://issues.apache.org/jira/browse/FLINK-33319
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Reporter: Gyula Fora
>Priority: Minor
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent

2024-01-04 Thread Xiangyan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangyan updated FLINK-33998:
-
Description: 
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

{code:java}
high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 
05:{color:#ff}{{*53*}}{color}:08, both JM lost connection to 
kube-apiserver. But there is no more connection error within a few seconds. I 
guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"DefaultDispatcherRunner was revoked the leadership" error after the leader 
election timeout (at 05:{color:#ff}{{*54*}}{color}:08) and then restarted 
itself. While the other JM was still running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 

  was:
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

{code:java}
high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 
05:{color:#ff}{{*53*}}{color}:08, both JM lost connection to 
kube-apiserver. But there is no more connection error within a few seconds. I 
guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"{{{}DefaultDispatcherRunner was revoked the leadership{}}}" error after the 
leader election timeout (at 05:{color:#ff}{{*54*}}{color}:08) and then 
restarted itself. While the other JM was still running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 


> Flink Job Manager restarted after kube-apiserver connection intermittent
> 
>
> Key: FLINK-33998
> URL: https://issues.apache.org/jira/browse/FLINK-33998
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6
> Environment: Kubernetes 1.24
> Flink Operator 1.4
> Flink 1.13.6
>Reporter: Xiangyan
>Priority: Major
> Attachments: audit-log-no-restart.txt, audit-log-restart.txt, 
> connection timeout.png, jm-no-restart4.log, jm-restart4.log
>
>
> We are running Flink on AWS EKS and experienced Job Manager restart issue 
> when EKS control plane scaled up/in.
> I can reproduce this issue in my local environment too.
> Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster 
> by my own with below setup:
>  * Two kube-apiserver, only one is running at a time;
>  * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
>  * Enable Flink Job Manager HA;
>  * Configure Job Manager leader election timeout;
> {code:java}
> high-availability.kubernetes.leader-election.lease-duration: "60s"
> high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
> For testing, I switch the running kube-apiserver from one instance to another 
> each time. When the kube-apiserver is switching, I can see that some Job 
> Managers restart, but some are still running normally.
> Here is an example. When kube-apiserver swatched over at 
> 

[jira] [Updated] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent

2024-01-04 Thread Xiangyan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangyan updated FLINK-33998:
-
Description: 
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

{code:java}
high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 
05:{color:#ff}{{*53*}}{color}:08, both JM lost connection to 
kube-apiserver. But there is no more connection error within a few seconds. I 
guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"{{{}DefaultDispatcherRunner was revoked the leadership{}}}" error after the 
leader election timeout (at 05:{color:#ff}{{*54*}}{color}:08) and then 
restarted itself. While the other JM was still running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 

  was:
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

{code:java}
high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 
05:{color:#FF}{{*53*}}{color}:08, both JM lost connection to 
kube-apiserver. But there is no more connection error within a few seconds. I 
guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"leadership revoked" error after the leader election timeout (at 
05:{color:#FF}{{*54*}}{color}:08) and then restarted itself. While the 
other JM was still running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 


> Flink Job Manager restarted after kube-apiserver connection intermittent
> 
>
> Key: FLINK-33998
> URL: https://issues.apache.org/jira/browse/FLINK-33998
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6
> Environment: Kubernetes 1.24
> Flink Operator 1.4
> Flink 1.13.6
>Reporter: Xiangyan
>Priority: Major
> Attachments: audit-log-no-restart.txt, audit-log-restart.txt, 
> connection timeout.png, jm-no-restart4.log, jm-restart4.log
>
>
> We are running Flink on AWS EKS and experienced Job Manager restart issue 
> when EKS control plane scaled up/in.
> I can reproduce this issue in my local environment too.
> Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster 
> by my own with below setup:
>  * Two kube-apiserver, only one is running at a time;
>  * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
>  * Enable Flink Job Manager HA;
>  * Configure Job Manager leader election timeout;
> {code:java}
> high-availability.kubernetes.leader-election.lease-duration: "60s"
> high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
> For testing, I switch the running kube-apiserver from one instance to another 
> each time. When the kube-apiserver is switching, I can see that some Job 
> Managers restart, but some are still running normally.
> Here is an example. When kube-apiserver swatched over at 
> 05:{color:#ff}{{*53*}}{color}:08, both JM lost connection 

[jira] [Updated] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent

2024-01-04 Thread Xiangyan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangyan updated FLINK-33998:
-
Description: 
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

{code:java}
high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 
05:{color:#FF}{{*53*}}{color}:08, both JM lost connection to 
kube-apiserver. But there is no more connection error within a few seconds. I 
guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"leadership revoked" error after the leader election timeout (at 
05:{color:#FF}{{*54*}}{color}:08) and then restarted itself. While the 
other JM was still running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 

  was:
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

{code:java}
high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}

For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 05:{{{}*53*{}}}:08, 
both JM lost connection to kube-apiserver. But there is no more connection 
error within a few seconds. I guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"leadership revoked" error after the leader election timeout (at 
05:{{{}*54*{}}}:08) and then restarted itself. While the other JM was still 
running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 


> Flink Job Manager restarted after kube-apiserver connection intermittent
> 
>
> Key: FLINK-33998
> URL: https://issues.apache.org/jira/browse/FLINK-33998
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6
> Environment: Kubernetes 1.24
> Flink Operator 1.4
> Flink 1.13.6
>Reporter: Xiangyan
>Priority: Major
> Attachments: audit-log-no-restart.txt, audit-log-restart.txt, 
> connection timeout.png, jm-no-restart4.log, jm-restart4.log
>
>
> We are running Flink on AWS EKS and experienced Job Manager restart issue 
> when EKS control plane scaled up/in.
> I can reproduce this issue in my local environment too.
> Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster 
> by my own with below setup:
>  * Two kube-apiserver, only one is running at a time;
>  * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
>  * Enable Flink Job Manager HA;
>  * Configure Job Manager leader election timeout;
> {code:java}
> high-availability.kubernetes.leader-election.lease-duration: "60s"
> high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
> For testing, I switch the running kube-apiserver from one instance to another 
> each time. When the kube-apiserver is switching, I can see that some Job 
> Managers restart, but some are still running normally.
> Here is an example. When kube-apiserver swatched over at 
> 05:{color:#FF}{{*53*}}{color}:08, both JM lost connection to 
> kube-apiserver. But there is no more connection error within a few 

[jira] [Updated] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent

2024-01-04 Thread Xiangyan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangyan updated FLINK-33998:
-
Description: 
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

{code:java}
high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}

For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 05:{{{}*53*{}}}:08, 
both JM lost connection to kube-apiserver. But there is no more connection 
error within a few seconds. I guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"leadership revoked" error after the leader election timeout (at 
05:{{{}*54*{}}}:08) and then restarted itself. While the other JM was still 
running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 

  was:
We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"
 
For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 05:{{{}*53*{}}}:08, 
both JM lost connection to kube-apiserver. But there is no more connection 
error within a few seconds. I guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"leadership revoked" error after the leader election timeout (at 
05:{{{}*54*{}}}:08) and then restarted itself. While the other JM was still 
running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 


> Flink Job Manager restarted after kube-apiserver connection intermittent
> 
>
> Key: FLINK-33998
> URL: https://issues.apache.org/jira/browse/FLINK-33998
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6
> Environment: Kubernetes 1.24
> Flink Operator 1.4
> Flink 1.13.6
>Reporter: Xiangyan
>Priority: Major
> Attachments: audit-log-no-restart.txt, audit-log-restart.txt, 
> connection timeout.png, jm-no-restart4.log, jm-restart4.log
>
>
> We are running Flink on AWS EKS and experienced Job Manager restart issue 
> when EKS control plane scaled up/in.
> I can reproduce this issue in my local environment too.
> Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster 
> by my own with below setup:
>  * Two kube-apiserver, only one is running at a time;
>  * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
>  * Enable Flink Job Manager HA;
>  * Configure Job Manager leader election timeout;
> {code:java}
> high-availability.kubernetes.leader-election.lease-duration: "60s"
> high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
> For testing, I switch the running kube-apiserver from one instance to another 
> each time. When the kube-apiserver is switching, I can see that some Job 
> Managers restart, but some are still running normally.
> Here is an example. When kube-apiserver swatched over at 05:{{{}*53*{}}}:08, 
> both JM lost connection to kube-apiserver. But there is no more connection 
> error within a few seconds. I guess the connection recovered by retry.
> However, one of the 

[jira] [Created] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent

2024-01-04 Thread Xiangyan (Jira)
Xiangyan created FLINK-33998:


 Summary: Flink Job Manager restarted after kube-apiserver 
connection intermittent
 Key: FLINK-33998
 URL: https://issues.apache.org/jira/browse/FLINK-33998
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.13.6
 Environment: Kubernetes 1.24

Flink Operator 1.4

Flink 1.13.6
Reporter: Xiangyan
 Attachments: audit-log-no-restart.txt, audit-log-restart.txt, 
connection timeout.png, jm-no-restart4.log, jm-restart4.log

We are running Flink on AWS EKS and experienced Job Manager restart issue when 
EKS control plane scaled up/in.

I can reproduce this issue in my local environment too.

Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster by 
my own with below setup:
 * Two kube-apiserver, only one is running at a time;
 * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
 * Enable Flink Job Manager HA;
 * Configure Job Manager leader election timeout;

high-availability.kubernetes.leader-election.lease-duration: "60s"
high-availability.kubernetes.leader-election.renew-deadline: "60s"
 
For testing, I switch the running kube-apiserver from one instance to another 
each time. When the kube-apiserver is switching, I can see that some Job 
Managers restart, but some are still running normally.

Here is an example. When kube-apiserver swatched over at 05:{{{}*53*{}}}:08, 
both JM lost connection to kube-apiserver. But there is no more connection 
error within a few seconds. I guess the connection recovered by retry.

However, one of the JM (the 2nd one in the attached screen shot) reported 
"leadership revoked" error after the leader election timeout (at 
05:{{{}*54*{}}}:08) and then restarted itself. While the other JM was still 
running normally.

>From kube-apiserver audit logs, the normal JM was able to renew leader lease 
>after the interruption. But there is no any lease renew request from the 
>failed JM until it restarted.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31896) Extend web interface to support failure labels

2024-01-04 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek resolved FLINK-31896.
---
Fix Version/s: 1.19.0
   Resolution: Fixed

> Extend web interface to support failure labels
> --
>
> Key: FLINK-31896
> URL: https://issues.apache.org/jira/browse/FLINK-31896
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31896) Extend web interface to support failure labels

2024-01-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-31896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803310#comment-17803310
 ] 

David Morávek commented on FLINK-31896:
---

master: aba1ee85d6a3854fdb1f8a628fed0ad19460d086

> Extend web interface to support failure labels
> --
>
> Key: FLINK-31896
> URL: https://issues.apache.org/jira/browse/FLINK-31896
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31896][runtime] Extend web interface to support failure labels [flink]

2024-01-04 Thread via GitHub


dmvk merged PR #22646:
URL: https://github.com/apache/flink/pull/22646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31896][runtime] Extend web interface to support failure labels [flink]

2024-01-04 Thread via GitHub


dmvk commented on code in PR #22646:
URL: https://github.com/apache/flink/pull/22646#discussion_r1411713724


##
flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html:
##
@@ -52,6 +52,12 @@
   {{ item.selected.timestamp | date: '-MM-dd HH:mm:ss' 
}}
   
 {{ item.selected.exceptionName }}
+error`?
   
   https://ng.ant.design/components/tag/en



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33997) Typo in the doc `classloader.parent-first-patterns-additional`

2024-01-04 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-33997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reassigned FLINK-33997:
--

Assignee: Rodrigo Meneses

> Typo in the doc `classloader.parent-first-patterns-additional`
> --
>
> Key: FLINK-33997
> URL: https://issues.apache.org/jira/browse/FLINK-33997
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0
>Reporter: Matyas Orhidi
>Assignee: Rodrigo Meneses
>Priority: Major
>
> Typo in the doc:
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code]
> classloader.parent-first-patterns-additional -> 
> classloader.parent-first-patterns.additional



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33997) Typo in the doc `classloader.parent-first-patterns-additional`

2024-01-04 Thread Matyas Orhidi (Jira)
Matyas Orhidi created FLINK-33997:
-

 Summary: Typo in the doc 
`classloader.parent-first-patterns-additional`
 Key: FLINK-33997
 URL: https://issues.apache.org/jira/browse/FLINK-33997
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.18.0
Reporter: Matyas Orhidi


Typo in the doc:
[https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code]

classloader.parent-first-patterns-additional -> 
classloader.parent-first-patterns.additional



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33994) Use Datadog api key from environment variables if not set in conf

2024-01-04 Thread Sweta Kalakuntla (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803289#comment-17803289
 ] 

Sweta Kalakuntla commented on FLINK-33994:
--

Ok, I just shared the PR that was done by someone in that regard. 

However this is still needed for to store config in environment and not in code.

> Use Datadog api key from environment variables if not set in conf
> -
>
> Key: FLINK-33994
> URL: https://issues.apache.org/jira/browse/FLINK-33994
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Sweta Kalakuntla
>Priority: Major
>
> Add a way to set Datadog API key from the environment variables. This way 
> during deployment, there is way to set the value from secrets/vault instead 
> of hardcoding key into code.
> Someone has created PR :
> [https://github.com/apache/flink/pull/19684/files] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


rkhachatryan commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442110980


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   Good idea, should work!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same project.

2024-01-04 Thread Feng Jin (Jira)
Feng Jin created FLINK-33996:


 Summary: Support disabling project rewrite when multiple exprs in 
the project reference the same project.
 Key: FLINK-33996
 URL: https://issues.apache.org/jira/browse/FLINK-33996
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: Feng Jin


When multiple top projects reference the same bottom project, project rewrite 
rules may result in complex projects being calculated multiple times.

Take the following SQL as an example:

{code:sql}
create table test_source(a varchar) with ('connector'='datagen');

explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
{code}


The final SQL plan is as follows:


{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}

It can be observed that after project write, regex_place is calculated twice. 
Generally speaking, regular expression matching is a time-consuming operation 
and we usually do not want it to be calculated multiple times. Therefore, for 
this scenario, we can support disabling project rewrite.

After disabling some rules, the final plan we obtained is as follows:


{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}


After testing, we probably need to modify these few rules:

org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule

org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule

org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule








--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]

2024-01-04 Thread via GitHub


bvarghese1 commented on code in PR #24020:
URL: https://github.com/apache/flink/pull/24020#discussion_r1442091512


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java:
##
@@ -198,6 +198,18 @@ public SqlTestStep getRunSqlTestStep() {
 return (SqlTestStep) sqlSteps.get(0);
 }
 
+/**
+ * Convenience method to avoid boilerplate code. It assumes only one 
statement set is tested.
+ */
+public StatementSetTestStep getRunStatementSetTestStep() {
+List statementSetSteps =
+runSteps.stream()
+.filter(s -> s.getKind() == TestKind.STATEMENT_SET)
+.collect(Collectors.toList());
+
+return (StatementSetTestStep) statementSetSteps.get(0);

Review Comment:
   For tests, as a convenience we can assume there will be only 1 statement set 
which can contain multiple sql statements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33979] Implement restore tests for TableSink node [flink]

2024-01-04 Thread via GitHub


jnh5y commented on PR #24024:
URL: https://github.com/apache/flink/pull/24024#issuecomment-1877530126

   Similar to the table source PR, I peeked at `SinkAbilitySpec`. 
   
   Looks like `OverwriteSpec`, `PartitioningSpec`, and `WritingMetadataSpec` 
are covered.
   
   I wonder if we should try and cover `RowLevelDeleteSpec` and 
`RowLevelUpdateSpec`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]

2024-01-04 Thread via GitHub


jnh5y commented on PR #24020:
URL: https://github.com/apache/flink/pull/24020#issuecomment-1877525393

   From very quick skimming... I looked at `SourceAbilitySpec`, and it looks 
like this test ought to help cover the `JsonSubTypes` there.  
   
   I think I see the following readily:
   ```
   ProjectPushDownSpec
   FilterPushDownSpec
   LimitPushDownSpec
   PartitionPushDownSpec
   ReadingMetadataSpec
   WatermarkPushDownSpec
   ```
   
   Is there coverage for the other two?
   ```
   SourceWatermarkSpec
   AggregatePushDownSpec
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33969] Implement restore tests for TableSourceScan node [flink]

2024-01-04 Thread via GitHub


jnh5y commented on code in PR #24020:
URL: https://github.com/apache/flink/pull/24020#discussion_r1442077875


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java:
##
@@ -198,6 +198,18 @@ public SqlTestStep getRunSqlTestStep() {
 return (SqlTestStep) sqlSteps.get(0);
 }
 
+/**
+ * Convenience method to avoid boilerplate code. It assumes only one 
statement set is tested.
+ */
+public StatementSetTestStep getRunStatementSetTestStep() {
+List statementSetSteps =
+runSteps.stream()
+.filter(s -> s.getKind() == TestKind.STATEMENT_SET)
+.collect(Collectors.toList());
+
+return (StatementSetTestStep) statementSetSteps.get(0);

Review Comment:
   Any particular reason to assume that there is only one StatementSet being 
tested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33958] Implement restore tests for IntervalJoin node [flink]

2024-01-04 Thread via GitHub


jnh5y commented on PR #24009:
URL: https://github.com/apache/flink/pull/24009#issuecomment-1877510321

   From reading the code for `StreamExecIntervalJoin`, I noticed 
pad/filter-left/right transformations.  Seems like those can be produced when 
the `TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS` is used.
   
   Since it is a legacy thing, not sure if we want to test it or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33995) Add test in test_file_sink.sh s3 StreamingFileSink for csv

2024-01-04 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh reassigned FLINK-33995:
---

Assignee: Samrat Deb

> Add test in test_file_sink.sh s3 StreamingFileSink for csv 
> ---
>
> Key: FLINK-33995
> URL: https://issues.apache.org/jira/browse/FLINK-33995
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>
> test_file_sink.sh s3 StreamingFileSink doesnt have coverage for csv format . 
> this task will add new test case to cover when format is `csv`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33995) Add test in test_file_sink.sh s3 StreamingFileSink for csv

2024-01-04 Thread Samrat Deb (Jira)
Samrat Deb created FLINK-33995:
--

 Summary: Add test in test_file_sink.sh s3 StreamingFileSink for 
csv 
 Key: FLINK-33995
 URL: https://issues.apache.org/jira/browse/FLINK-33995
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Samrat Deb


test_file_sink.sh s3 StreamingFileSink doesnt have coverage for csv format . 

this task will add new test case to cover when format is `csv`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33536] Fix Flink Table API CSV streaming sink fails with IOException: Stream closed [flink]

2024-01-04 Thread via GitHub


Samrat002 commented on PR #23725:
URL: https://github.com/apache/flink/pull/23725#issuecomment-1877501000

   Hi @MartijnVisser , @hlteoh37 , 
   
   I found that `test_file_sink.sh s3 StreamingFileSink` doesnt have test case 
for csv format . 
   https://issues.apache.org/jira/browse/FLINK-33995
   i will work on it and raise a pr for the same


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33994) Use Datadog api key from environment variables if not set in conf

2024-01-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803251#comment-17803251
 ] 

Martijn Visser commented on FLINK-33994:


That PR isn't in good shape (doesn't have a test and it just changes the 
behavior from one thing to another, instead of properly configuring it). 

> Use Datadog api key from environment variables if not set in conf
> -
>
> Key: FLINK-33994
> URL: https://issues.apache.org/jira/browse/FLINK-33994
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Sweta Kalakuntla
>Priority: Major
>
> Add a way to set Datadog API key from the environment variables. This way 
> during deployment, there is way to set the value from secrets/vault instead 
> of hardcoding key into code.
> Someone has created PR :
> [https://github.com/apache/flink/pull/19684/files] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33994) Use Datadog api key from environment variables if not set in conf

2024-01-04 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-33994:
---
Component/s: (was: flink-contrib)

> Use Datadog api key from environment variables if not set in conf
> -
>
> Key: FLINK-33994
> URL: https://issues.apache.org/jira/browse/FLINK-33994
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Sweta Kalakuntla
>Priority: Major
>
> Add a way to set Datadog API key from the environment variables. This way 
> during deployment, there is way to set the value from secrets/vault instead 
> of hardcoding key into code.
> Someone has created PR :
> [https://github.com/apache/flink/pull/19684/files] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33994) Use Datadog api key from environment variables if not set in conf

2024-01-04 Thread Sweta Kalakuntla (Jira)
Sweta Kalakuntla created FLINK-33994:


 Summary: Use Datadog api key from environment variables if not set 
in conf
 Key: FLINK-33994
 URL: https://issues.apache.org/jira/browse/FLINK-33994
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib, Runtime / Metrics
Reporter: Sweta Kalakuntla


Add a way to set Datadog API key from the environment variables. This way 
during deployment, there is way to set the value from secrets/vault instead of 
hardcoding key into code.

Someone has created PR :

[https://github.com/apache/flink/pull/19684/files] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441999383


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   use an abbreviation `o.a.f`? 樂 Could we use a tooltip to expand the scope? 
For example display `(...).CheckpointStatsTracker` that expands in the tooltip 
to the fully qualified name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33932][checkpointing] Add retry mechanism in RocksDBStateUploader [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23986:
URL: https://github.com/apache/flink/pull/23986#discussion_r1441986930


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##
@@ -107,13 +122,43 @@ private List> 
createUploadFutures(
 e ->
 CompletableFuture.supplyAsync(
 CheckedSupplier.unchecked(
-() ->
-
uploadLocalFileToCheckpointFs(
-e,
-
checkpointStreamFactory,
-stateScope,
-
closeableRegistry,
-
tmpResourcesRegistry)),
+() -> {

Review Comment:
   Please extract to separate method.



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##
@@ -28,31 +28,46 @@
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.concurrent.FixedRetryStrategy;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
 import org.apache.flink.util.function.CheckedSupplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** Help class for uploading RocksDB state files. */
 public class RocksDBStateUploader extends RocksDBStateDataTransfer {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateUploader.class);
+
 private static final int READ_BUFFER_SIZE = 16 * 1024;
 
+private static final int DEFAULT_RETRY_TIMES = 3;
+
+private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(1L);

Review Comment:
   This should be configurable and most likely in the first release default to 
the current behaviour.



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##
@@ -107,13 +122,43 @@ private List> 
createUploadFutures(
 e ->
 CompletableFuture.supplyAsync(
 CheckedSupplier.unchecked(
-() ->
-
uploadLocalFileToCheckpointFs(
-e,
-
checkpointStreamFactory,
-stateScope,
-
closeableRegistry,
-
tmpResourcesRegistry)),
+() -> {
+RetryStrategy 
retryStrategy =
+new 
FixedRetryStrategy(
+
DEFAULT_RETRY_TIMES,
+
DEFAULT_RETRY_DELAY);
+while (true) {
+try {
+return 
uploadLocalFileToCheckpointFs(
+e,
+
checkpointStreamFactory,
+stateScope,
+
closeableRegistry,
+
tmpResourcesRegistry);
+} catch (Throwable t) {
+if (retryStrategy
+
.getNumRemainingRetries()
+  

[jira] [Commented] (FLINK-33932) Support retry mechanism for rocksdb uploader

2024-01-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803237#comment-17803237
 ] 

Piotr Nowojski commented on FLINK-33932:


Yes, I don't think this should cause a correctness issue. However we need this 
behaviour to be configurable and most likely in the first release it should 
default to the old behaviour.

> Support retry mechanism for rocksdb uploader
> 
>
> Key: FLINK-33932
> URL: https://issues.apache.org/jira/browse/FLINK-33932
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Guojun Li
>Priority: Major
>  Labels: pull-request-available
>
> Rocksdb uploader will throw exception and decline the current checkpoint if 
> there are errors when uploading to remote file system like hdfs.
> The exception is as below:
> {noformat}
>  
> 2023-12-19 08:46:00,197 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 2 by task 
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of 
> job a025f19e at 
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
> Calc[133] (184/500)#0.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
> to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
> Could not flush to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.net.ConnectException: Connection timed out
>     at 

[jira] [Updated] (FLINK-33993) Ineffective scaling detection events are misleading

2024-01-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33993:
---
Labels: pull-request-available  (was: )

> Ineffective scaling detection events are misleading
> ---
>
> Key: FLINK-33993
> URL: https://issues.apache.org/jira/browse/FLINK-33993
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> When the ineffective scaling decision feature is turned off, events are 
> regenerated which look like this:
> {noformat}
> Skipping further scale up after ineffective previous scale up for 
> 65c763af14a952c064c400d516c25529
> {noformat}
> This is misleading because no action will be taken. It is fair to inform 
> users about ineffective scale up even when the feature is disabled but a 
> different message should be printed to convey that no action will be taken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33932) Support retry mechanism for rocksdb uploader

2024-01-04 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-33932:
---
Description: 
Rocksdb uploader will throw exception and decline the current checkpoint if 
there are errors when uploading to remote file system like hdfs.

The exception is as below:
{noformat}
 
2023-12-19 08:46:00,197 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
checkpoint 2 by task 
5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of job 
a025f19e at 
application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
org.apache.flink.util.SerializedThrowable: 
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
checkpoint failed.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
Calc[133] (184/500)#0.
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    ... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
to file and close the file system output stream to 
hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
stream state handle
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
Could not flush to file and close the file system output stream to 
hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
stream state handle
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
 ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 ~[?:?]
    ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: 
java.net.ConnectException: Connection timed out
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) 
~[?:?]
    at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) 
~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) 
~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257)
 ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
    

[jira] [Created] (FLINK-33993) Ineffective scaling detection events are misleading

2024-01-04 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33993:
--

 Summary: Ineffective scaling detection events are misleading
 Key: FLINK-33993
 URL: https://issues.apache.org/jira/browse/FLINK-33993
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


When the ineffective scaling decision feature is turned off, events are 
regenerated which look like this:

{noformat}
Skipping further scale up after ineffective previous scale up for 
65c763af14a952c064c400d516c25529
{noformat}

This is misleading because no action will be taken. It is fair to inform users 
about ineffective scale up even when the feature is disabled but a different 
message should be printed to convey that no action will be taken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes [flink]

2024-01-04 Thread via GitHub


XComp commented on PR #22984:
URL: https://github.com/apache/flink/pull/22984#issuecomment-1877310802

   Thanks for picking this up. We should merge FLINK-32570 soon'ish to not miss 
it for 1.19.
   
   I reiterated over the PR once more with the following changes:
   * I rebased the branch to most-recent master. 
   * I removed the hotfix PRs which added API-related annotations. That is 
out-of-scope for this PR.
   * I improved the naming for some of the methods (`getWindowDuration` becomes 
`getWindowSize`)
   * I reorganized/squashed commits into two (it makes sense to look at the two 
commits individually when doing the next review)
   
   PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes [flink]

2024-01-04 Thread via GitHub


XComp commented on code in PR #22984:
URL: https://github.com/apache/flink/pull/22984#discussion_r1441916069


##
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java:
##
@@ -53,13 +54,14 @@ public class StateTtlConfig implements Serializable {
 private static final long serialVersionUID = -7592693245044289793L;
 
 public static final StateTtlConfig DISABLED =
-newBuilder(Time.milliseconds(Long.MAX_VALUE))
+newBuilder(Duration.ofMillis(Long.MAX_VALUE))
 .setUpdateType(UpdateType.Disabled)
 .build();
 
 /**
  * This option value configures when to update last access timestamp which 
prolongs state TTL.
  */
+@PublicEvolving

Review Comment:
   I'm gonna remove the hotfix commits entirely. They are out-of-scope for this 
work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes [flink]

2024-01-04 Thread via GitHub


XComp commented on code in PR #22984:
URL: https://github.com/apache/flink/pull/22984#discussion_r1441915244


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -810,7 +811,8 @@ public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategy() {
  * @param numberOfExecutionRetries The number of times the system will try 
to re-execute failed
  * tasks.
  * @deprecated This method will be replaced by {@link 
#setRestartStrategy}. The {@link
- * RestartStrategies#fixedDelayRestart(int, Time)} contains the number 
of execution retries.
+ * RestartStrategies#fixedDelayRestart(int, Duration)} contains the 
number of execution
+ * retries.

Review Comment:
   See my reasoning in the [above 
comment](https://github.com/apache/flink/pull/22984#discussion_r1441914909)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes [flink]

2024-01-04 Thread via GitHub


XComp commented on code in PR #22984:
URL: https://github.com/apache/flink/pull/22984#discussion_r1441914909


##
flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java:
##
@@ -72,9 +72,23 @@ public static RestartStrategyConfiguration fixedDelayRestart(
  * @param restartAttempts Number of restart attempts for the 
FixedDelayRestartStrategy
  * @param delayInterval Delay in-between restart attempts for the 
FixedDelayRestartStrategy
  * @return FixedDelayRestartStrategy
+ * @deprecated Use {@link #fixedDelayRestart(int, Duration)}
  */
+@Deprecated

Review Comment:
   I went through the change once more and came to the conclusion that we could 
keep this change. You're right when stating that the `RestartStrategies` are 
going away, anyway. But it doesn't hurt that we do the deprecation 
"independently" of the `RestartStrategies` efforts. That way, we would still 
have a consistent code base if (for whatever reason) 
[FLIP-381](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992)
 gets reverted. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803203#comment-17803203
 ] 

Zhongqiang Gong commented on FLINK-33970:
-

[~martijnvisser] And I checked my pr cannot solve the original problem , I will 
try to find the way to both check hugo build and dead link check. Thanks for 
your kind remind.

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803201#comment-17803201
 ] 

Piotr Nowojski edited comment on FLINK-33856 at 1/4/24 3:00 PM:


Hi, I second that implementing this as metrics doesn't sound to be 
right/correct. 

 

[~hejufang001] , I wouldn't make this a subtask of the FLIP-384, but if needed 
a follow up. There are two things worth notting/discussing:
 * please check the discussion on the dev mailing list in FLIP-384 about the 
current limitations. Namely we are currently only creating a trace with a 
single span for the whole checkpoint. Also it's currently very sparsely 
populated with metrics. There were discussions/plans (CC [~fanrui] if I 
remember correctly you wanted to follow up on this?) about creating children 
spans per each subtask/task, to mimic the existing `CheckpointingMetrics` 
structure. Probably this FLIP requires that change.
 * once we have per subtask spans, or aggregated metrics as in [the recovery 
spans from 
FLIP-386|https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans]
 , we might not need some of the metrics, that you are proposing here? For 
example `writeRate` should be easily computed from the async duration / 
checkpointed state size?

Anyway, I think FLIP will be required here. 


was (Author: pnowojski):
Hi, I second that implementing this as metrics doesn't sound to be 
right/correct. 

 

[~hejufang001] , I wouldn't make this a subtask of the FLIP-384, but if needed 
a follow up. There are two things worth notting/discussing:
 * please check the discussion on the dev mailing list in FLIP-384 about the 
current limitations. Namely we are currently only creating a trace with a 
single span for the whole checkpoint. Also it's currently very sparsely 
populated with metrics. There were discussions plans about creating children 
spans per each subtask/task, to mimic the existing `CheckpointingMetrics` 
structure. Probably this FLIP requires that change.
 * once we have per subtask spans, or aggregated metrics as in [the recovery 
spans from 
FLIP-386|https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans]
 , we might not need some of the metrics, that you are proposing here? For 
example `writeRate` should be easily computed from the async duration / 
checkpointed state size?

Anyway, I think FLIP will be required here.

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803201#comment-17803201
 ] 

Piotr Nowojski commented on FLINK-33856:


Hi, I second that implementing this as metrics doesn't sound to be 
right/correct. 

 

[~hejufang001] , I wouldn't make this a subtask of the FLIP-384, but if needed 
a follow up. There are two things worth notting/discussing:
 * please check the discussion on the dev mailing list in FLIP-384 about the 
current limitations. Namely we are currently only creating a trace with a 
single span for the whole checkpoint. Also it's currently very sparsely 
populated with metrics. There were discussions plans about creating children 
spans per each subtask/task, to mimic the existing `CheckpointingMetrics` 
structure. Probably this FLIP requires that change.
 * once we have per subtask spans, or aggregated metrics as in [the recovery 
spans from 
FLIP-386|https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans]
 , we might not need some of the metrics, that you are proposing here? For 
example `writeRate` should be easily computed from the async duration / 
checkpointed state size?

Anyway, I think FLIP will be required here.

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-01-04 Thread via GitHub


nicusX commented on PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-1877219219

   @hlteoh37 @dannycranmer please review this PR when you have the chance, so 
we can proceed with the other related tasks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24379][Formats] Avro Glue Schema Registry table format [flink-connector-aws]

2024-01-04 Thread via GitHub


nicusX commented on PR #122:
URL: 
https://github.com/apache/flink-connector-aws/pull/122#issuecomment-1877217949

   @dannycranmer please review this PR when you have the chance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803196#comment-17803196
 ] 

Zhongqiang Gong commented on FLINK-33970:
-

[~martijnvisser] I had tried to use hugo to check document build , but 
`config.toml` is neccessary for hugo. So I use simple scrips to check dead link.

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


rkhachatryan commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441837132


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   NIT: this field is so wide that I don't see the Description column; I can 
scroll horizontally, but would only do that if I know there is a column on the 
right.
   
   Don't know how to fix that easily though 路‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33490) Validate the name conflicts when creating view

2024-01-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803187#comment-17803187
 ] 

Martijn Visser commented on FLINK-33490:


Sounds good to me, thank you! 

> Validate the name conflicts when creating view
> --
>
> Key: FLINK-33490
> URL: https://issues.apache.org/jira/browse/FLINK-33490
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> We should forbid 
> ```
> CREATE VIEW id_view AS
> SELECT id, uid AS id FROM id_table
> ```
> As the SQL standards states,
> If  is specified, then:
> i) If any two columns in the table specified by the  have 
> equivalent s, or if any column of that table has an 
> implementation-dependent name, then a  shall be specified.
> ii) Equivalent s shall not be specified more than once in the 
> .
> Many databases also throw exception when view name conflicts, e.g. mysql, 
> postgres.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33992) Add option to fetch the jar from private repository in FlinkSessionJob

2024-01-04 Thread Sweta Kalakuntla (Jira)
Sweta Kalakuntla created FLINK-33992:


 Summary: Add option to fetch the jar from private repository in 
FlinkSessionJob
 Key: FLINK-33992
 URL: https://issues.apache.org/jira/browse/FLINK-33992
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Sweta Kalakuntla


FlinkSessionJob spec does not have a capability to download job jar from remote 
private repository. It can currently only download from public repositories. 

Adding capability to supply credentials  to the *spec.job.jarURI* in 
FlinkSessionJob, will solve that problem.

If I use initContainer to download the jar in FlinkDeployment and try to access 
that in FlinkSessionJob, the operator is unable to find the jar in the defined 
path.
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: job1
spec:
  deploymentName: session-cluster
  job:
jarURI: file:///opt/flink/job.jar
parallelism: 4
upgradeMode: savepoint
(edited)
caused by: java.io.FileNotFoundException: /opt/flink/job.jar (No such file or 
directory)
at java.base/java.io.FileInputStream.open0(Native Method)
at java.base/java.io.FileInputStream.open(Unknown Source)
at java.base/java.io.FileInputStream.(Unknown Source)
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
at 
org.apache.flink.kubernetes.operator.artifact.FileSystemBasedArtifactFetcher.fetch(FileSystemBasedArtifactFetcher.java:44)
at 
org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:63)
at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:707)
at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:212)
at 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:73)
at 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:44)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:120)
at 
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:109)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33897) Allow triggering unaligned checkpoint via CLI

2024-01-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803185#comment-17803185
 ] 

Piotr Nowojski commented on FLINK-33897:


I have mixed feelings. Shouldn't the solution be to just use/enable unaligned 
checkpoints? If one sets the alignment timeout to some reasonable value, I 
don't see a reason for someone to use aligned checkpoints anymore. Maybe 
instead let's consider deprecating aligned checkpoints without timeout?

Is there some real world motivation behind this feature?

I would be -1 for this feature, if it requires complicating/making changes to 
the actual barrier handling (apart of replacing 
{{SingleCheckpointBarrierHandler#aligned}} with 
{{SingleCheckpointBarrierHandler#alternating}} call). This code is complicated 
and in the past we had a lot of deadlocks, data corruptions and other critical 
bugs around those areas, so keeping it as simple as possible and minimising 
amount of supported features is quite important. 

> Allow triggering unaligned checkpoint via CLI
> -
>
> Key: FLINK-33897
> URL: https://issues.apache.org/jira/browse/FLINK-33897
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client, Runtime / Checkpointing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> After FLINK-6755, user could trigger checkpoint through CLI. However I 
> noticed there would be value supporting trigger it in unaligned way, since 
> the job may encounter a high back-pressure and an aligned checkpoint would 
> fail.
>  
> I suggest we provide an option '-unaligned' in CLI to support that.
>  
> Similar option would also be useful for REST api



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33897) Allow triggering unaligned checkpoint via CLI

2024-01-04 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-33897:
---
Issue Type: New Feature  (was: Improvement)

> Allow triggering unaligned checkpoint via CLI
> -
>
> Key: FLINK-33897
> URL: https://issues.apache.org/jira/browse/FLINK-33897
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> After FLINK-6755, user could trigger checkpoint through CLI. However I 
> noticed there would be value supporting trigger it in unaligned way, since 
> the job may encounter a high back-pressure and an aligned checkpoint would 
> fail.
>  
> I suggest we provide an option '-unaligned' in CLI to support that.
>  
> Similar option would also be useful for REST api



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803181#comment-17803181
 ] 

Martijn Visser commented on FLINK-33970:


Next to this, I believe this only fixes the problem for 404 links inside the 
connector docs itself. But that's not where the problem originated, that was in 
the Hugo integration when the Flink documentation is being built. 

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33970) Add necessary checks for connector document

2024-01-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803179#comment-17803179
 ] 

Martijn Visser commented on FLINK-33970:


This shouldn't be fixed per connector, but once in the shared_utils at 
https://github.com/apache/flink-connector-shared-utils/tree/ci_utils

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-04 Thread via GitHub


pnowojski commented on PR #24023:
URL: https://github.com/apache/flink/pull/24023#issuecomment-1877155133

   Thanks for the review!
   
   > Why only add the download metric? While you are at this, you could have 
also simply added metrics for, e.g., merge/clip.
   
   That was the scope of the FLIP. Partially because I wanted to keep the scope 
as small as possible to minimise required efforts. We can always add more 
metrics in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1441789636


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackendParametersImpl.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
+import org.apache.flink.runtime.state.StateBackend.KeyedStateBackendParameters;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+
+/**
+ * Internal POJO implementing {@link KeyedStateBackendParameters}
+ *
+ * @param 
+ */
+@Internal
+public class KeyedStateBackendParametersImpl implements 
KeyedStateBackendParameters {

Review Comment:
   As discussed offline, the interface is part of the Flink's public api, and 
having it as interface instead of a concrete pojo class gives us more 
flexibility in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441683484


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Because
   1. builder can be empty
   2. if it's not empty,  `buildRestoredCheckpointStats` can also return empty 
result
   
   This is an equivalent of
   ```
   if (builder.isEmpty()) {
 return null;
   }
   Optional stats = builder.get().buildRestoredCheckpointStats();
   return stats.orElse(null);
   ```
   
   Because of that `map` here returns `Optional>` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2024-01-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803176#comment-17803176
 ] 

Piotr Nowojski commented on FLINK-27756:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56003=logs=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819=2dd510a3-5041-5201-6dc3-54d310f68906

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.0, 1.17.0, 1.19.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441782436


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##
@@ -159,6 +160,11 @@ public ExecutionGraph createAndRestoreExecutionGraph(
 }
 };
 
+int totalNumberOfSubTasks =
+StreamSupport.stream(jobGraph.getVertices().spliterator(), 
false)
+.mapToInt(jobVertex -> jobVertex.getParallelism())

Review Comment:
   Thanks! fixed and I've added a new test 
`org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactoryTest#testCheckpointStatsTrackerUpdatedWithNewParallelism`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-01-04 Thread Flaviu Cicio (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flaviu Cicio updated FLINK-33989:
-
Description: 
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 
  'topic' = 'input', 
  'key.format' = 'raw', 
  'properties.bootstrap.servers' = 'kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 
  'topic' = 'output', 
  'key.format' = 'raw', 
  'properties.bootstrap.servers' = 'kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.

 

Moreover, if we make a select on the input table:
{code:sql}
SELECT * FROM input;
{code}
We will get the following entries:
||op||id||current_value||
|I|1|abc|
|-U|1|abc|
|+U|1|abcd|

We expected to see only the insert and the update_after entries.

The update_before is added at DeduplicateFunctionHelper#122.

This is easily reproducible with this test that we added in the 
UpsertKafkaTableITCase from flink-connector-kafka:
{code:java}
@Test
public void testAggregateFilterOmit() throws Exception {
String topic = COUNT_FILTER_TOPIC + "_" + format;
createTestTopic(topic, 1, 1);
env.setParallelism(1);
// -   test   ---
countFilterToUpsertKafkaOmitUpdateBefore(topic);
// - clean up ---
deleteTestTopic(topic);
}

private void countFilterToUpsertKafkaOmitUpdateBefore(String table) throws 
Exception {
String bootstraps = getBootstrapServers();
List data =
Arrays.asList(
Row.of(1, "Hi"),
Row.of(1, "Hello"),
Row.of(2, "Hello world"),
Row.of(2, "Hello world, how are you?"),
Row.of(2, "I am fine."),
Row.of(3, "Luke Skywalker"),
Row.of(3, "Comment#1"),
Row.of(3, "Comment#2"),
Row.of(4, "Comment#3"),
Row.of(4, null));

final String createSource =
String.format(
"CREATE TABLE aggfilter_%s ("
+ "  `id` INT,\n"
+ "  `comment` STRING\n"
+ ") WITH ("
+ "  'connector' = 'values',"
+ "  'data-id' = '%s'"
+ ")",
format, TestValuesTableFactory.registerData(data));
tEnv.executeSql(createSource);

final String createSinkTable =
String.format(
"CREATE TABLE %s (\n"
+ "  `id` INT,\n"
+ "  `comment` STRING,\n"
+ "  PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ") WITH (\n"
+ "  'connector' = 'upsert-kafka',\n"
+ "  'topic' = '%s',\n"
+ "  'properties.bootstrap.servers' = '%s',\n"
+ "  'key.format' = '%s',\n"
+ "  'value.format' = '%s'"
//+ "  'sink.omit-row-kind' = '-U'"
+ ")",
table, table, bootstraps, format, format);

tEnv.executeSql(createSinkTable);

String initialValues =
"INSERT INTO "
+ table
+ " "
+ "SELECT * "

[jira] [Updated] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-01-04 Thread Flaviu Cicio (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flaviu Cicio updated FLINK-33989:
-
Description: 
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'input', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'output', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.

 

Moreover, if we make a select on the input table:
{code:sql}
SELECT * FROM input;
{code}
We will get the following entries:
||op||id||current_value||
|I|1|abc|
|-U|1|abc|
|+U|1|abcd|

We expected to see only the insert and the update_after entries.

The update_before is added at DeduplicateFunctionHelper#122.

This is easily reproducible with this test that we added in the 
UpsertKafkaTableITCase from flink-connector-kafka:
{code:java}
@Test
public void testAggregateFilterOmit() throws Exception {
String topic = COUNT_FILTER_TOPIC + "_" + format;
createTestTopic(topic, 1, 1);
env.setParallelism(1);
// -   test   ---
countFilterToUpsertKafkaOmitUpdateBefore(topic);
// - clean up ---
deleteTestTopic(topic);
}

private void countFilterToUpsertKafkaOmitUpdateBefore(String table) throws 
Exception {
String bootstraps = getBootstrapServers();
List data =
Arrays.asList(
Row.of(1, "Hi"),
Row.of(1, "Hello"),
Row.of(2, "Hello world"),
Row.of(2, "Hello world, how are you?"),
Row.of(2, "I am fine."),
Row.of(3, "Luke Skywalker"),
Row.of(3, "Comment#1"),
Row.of(3, "Comment#2"),
Row.of(4, "Comment#3"),
Row.of(4, null));

final String createSource =
String.format(
"CREATE TABLE aggfilter_%s ("
+ "  `id` INT,\n"
+ "  `comment` STRING\n"
+ ") WITH ("
+ "  'connector' = 'values',"
+ "  'data-id' = '%s'"
+ ")",
format, TestValuesTableFactory.registerData(data));
tEnv.executeSql(createSource);

final String createSinkTable =
String.format(
"CREATE TABLE %s (\n"
+ "  `id` INT,\n"
+ "  `comment` STRING,\n"
+ "  PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ") WITH (\n"
+ "  'connector' = 'upsert-kafka',\n"
+ "  'topic' = '%s',\n"
+ "  'properties.bootstrap.servers' = '%s',\n"
+ "  'key.format' = '%s',\n"
+ "  'value.format' = '%s'"
//+ "  'sink.omit-row-kind' = '-U'"
+ ")",
table, table, bootstraps, format, format);

tEnv.executeSql(createSinkTable);

String initialValues =
"INSERT INTO "
+ table
+ " "
+ "SELECT * "
  

[jira] [Updated] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-01-04 Thread Flaviu Cicio (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flaviu Cicio updated FLINK-33989:
-
Description: 
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'input', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'output', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
 

But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.

Moreover, if we make a select on the input table:
{code:sql}
SELECT * FROM input;
{code}
We will get the following entries:
||op||id||current_value||
|I|1|abc|
|-U|1|abc|
|+U|1|abcd|

We expected to see only the insert and the update_after entries.
The update_before is added at DeduplicateFunctionHelper#122.

This is easily reproducible with this test that we added in the 
UpsertKafkaTableITCase from flink-connector-kafka:


{code:java}
@Test
public void testAggregateFilterOmit() throws Exception {
String topic = COUNT_FILTER_TOPIC + "_" + format;
createTestTopic(topic, 1, 1);
env.setParallelism(1);
// -   test   ---
countFilterToUpsertKafkaOmitUpdateBefore(topic);
// - clean up ---
deleteTestTopic(topic);
}

private void countFilterToUpsertKafkaOmitUpdateBefore(String table) throws 
Exception {
String bootstraps = getBootstrapServers();
List data =
Arrays.asList(
Row.of(1, "Hi"),
Row.of(1, "Hello"),
Row.of(2, "Hello world"),
Row.of(2, "Hello world, how are you?"),
Row.of(2, "I am fine."),
Row.of(3, "Luke Skywalker"),
Row.of(3, "Comment#1"),
Row.of(3, "Comment#2"),
Row.of(4, "Comment#3"),
Row.of(4, null));

final String createSource =
String.format(
"CREATE TABLE aggfilter_%s ("
+ "  `id` INT,\n"
+ "  `comment` STRING\n"
+ ") WITH ("
+ "  'connector' = 'values',"
+ "  'data-id' = '%s'"
+ ")",
format, TestValuesTableFactory.registerData(data));
tEnv.executeSql(createSource);

final String createSinkTable =
String.format(
"CREATE TABLE %s (\n"
+ "  `id` INT,\n"
+ "  `comment` STRING,\n"
+ "  PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ") WITH (\n"
+ "  'connector' = 'upsert-kafka',\n"
+ "  'topic' = '%s',\n"
+ "  'properties.bootstrap.servers' = '%s',\n"
+ "  'key.format' = '%s',\n"
+ "  'value.format' = '%s'"
//+ "  'sink.omit-row-kind' = '-U'"
+ ")",
table, table, bootstraps, format, format);

tEnv.executeSql(createSinkTable);

String initialValues =
"INSERT INTO "
+ table
+ " "
+ "SELECT * "
 

[jira] [Updated] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-01-04 Thread Flaviu Cicio (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flaviu Cicio updated FLINK-33989:
-
Description: 
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'input', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'output', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
 

But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.

Moreover, if we make a select on the input table:
{code:sql}
SELECT * FROM input;
{code}
We will get the following entries:
||op||id||current_value||
|I|1|abc|
|-U|1|abc|
|+U|1|abcd|

We expected to see only the insert and the update_after entries.
The update_before is added at DeduplicateFunctionHelper#122.

Here's a test that we added in the UpsertKafkaTableITCase from 
flink-connector-kafka:


 

 

  was:
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'input', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'output', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
 

But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.


Moreover, if we make a select on the input table:
{code:sql}
SELECT * FROM input;
{code}
We will get the following entries:

||op||id||current_value||
|I|1|abc|
|-U|1|abc|
|+U|1|abcd|
We expected to see only the insert and the update_after entries.
The update_before is added at DeduplicateFunctionHelper#122.


> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert 
> Kafka Connector
> -
>
> Key: FLINK-33989
> URL: https://issues.apache.org/jira/browse/FLINK-33989
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Runtime
>Affects Versions: 1.17.2
>Reporter: Flaviu Cicio
>Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 

[jira] [Resolved] (FLINK-33260) Custom Error Handling for Kinesis EFO Consumer

2024-01-04 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-33260.
---
Resolution: Fixed

> Custom Error Handling for Kinesis EFO Consumer
> --
>
> Key: FLINK-33260
> URL: https://issues.apache.org/jira/browse/FLINK-33260
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Danny Cranmer
>Assignee: Emre Kartoglu
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Background
> The Kinesis Consumer exposes various configuration that allows the user to 
> define retry and backoff strategies when dealing with errors. However, the 
> configuration does not allow the user to configure which errors are 
> retryable, or different strategies for different errors. The error handling 
> logic is hard coded within the connector. Over time we discover errors that 
> should be retryable that are not, for example KDS throwing 500 on 
> SubscribeToShare or transient DNS issues. When these arise the user can 
> either fork-fix the connector or log an issue and wait for the next version.
> h3. Scope
> Add the ability for the user to define retry/backoff strategy per error. This 
> could be achieved using flexible configuration keys, or allowing the user to 
> register their own retry strategies on the connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33991) Custom Error Handling for Kinesis Polling Consumer

2024-01-04 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-33991:
-

Assignee: Emre Kartoglu

> Custom Error Handling for Kinesis Polling Consumer 
> ---
>
> Key: FLINK-33991
> URL: https://issues.apache.org/jira/browse/FLINK-33991
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Emre Kartoglu
>Assignee: Emre Kartoglu
>Priority: Major
> Fix For: aws-connector-4.3.0
>
>
> We introduced custom error handling for the Kinesis EFO Consumer as part of 
> https://issues.apache.org/jira/browse/FLINK-33260
> PR for the EFO consumer: 
> [https://github.com/apache/flink-connector-aws/pull/110]
>  
> This ticket is to apply the same logic to the Kinesis Polling Consumer in the 
> same codebase.
> Current configuration for the EFO consumer looks like:
> {code:java}
> flink.shard.consumer.error.recoverable[0].exception=java.net.UnknownHostException
> flink.shard.consumer.error.recoverable[1].exception=java.net.SocketTimeoutException
>  {code}
> We should re-use the same code for the polling consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33260) Custom Error Handling for Kinesis EFO Consumer

2024-01-04 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-33260:
--
Summary: Custom Error Handling for Kinesis EFO Consumer  (was: Custom Error 
Handling for Kinesis Consumer)

> Custom Error Handling for Kinesis EFO Consumer
> --
>
> Key: FLINK-33260
> URL: https://issues.apache.org/jira/browse/FLINK-33260
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Danny Cranmer
>Assignee: Emre Kartoglu
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Background
> The Kinesis Consumer exposes various configuration that allows the user to 
> define retry and backoff strategies when dealing with errors. However, the 
> configuration does not allow the user to configure which errors are 
> retryable, or different strategies for different errors. The error handling 
> logic is hard coded within the connector. Over time we discover errors that 
> should be retryable that are not, for example KDS throwing 500 on 
> SubscribeToShare or transient DNS issues. When these arise the user can 
> either fork-fix the connector or log an issue and wait for the next version.
> h3. Scope
> Add the ability for the user to define retry/backoff strategy per error. This 
> could be achieved using flexible configuration keys, or allowing the user to 
> register their own retry strategies on the connector
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33991) Custom Error Handling for Kinesis Polling Consumer

2024-01-04 Thread Emre Kartoglu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emre Kartoglu updated FLINK-33991:
--
Description: 
We introduced custom error handling for the Kinesis EFO Consumer as part of 
https://issues.apache.org/jira/browse/FLINK-33260

PR for the EFO consumer: 
[https://github.com/apache/flink-connector-aws/pull/110]

 

This ticket is to apply the same logic to the Kinesis Polling Consumer in the 
same codebase.

Current configuration for the EFO consumer looks like:
{code:java}
flink.shard.consumer.error.recoverable[0].exception=java.net.UnknownHostException
flink.shard.consumer.error.recoverable[1].exception=java.net.SocketTimeoutException
 {code}
We should re-use the same code for the polling consumer.

  was:
We introduced custom error handling for the Kinesis EFO Consumer as part of 
https://issues.apache.org/jira/browse/FLINK-33260

PR for the EFO consumer: 
[https://github.com/apache/flink-connector-aws/pull/110]

 

This ticket is to apply the same logic to the Kinesis Polling Consumer in the 
same codebase.

Current configuration for the EFO consumer looks like:
{code:java}
flink.shard.consumer.error.recoverable[0].exception=java.net.UnknownHostException
flink.shard.consumer.error.recoverable[1].exception=java.net.SocketTimeoutException
 {code}
We should re-use the same code for the polling consumer.}}{}}}


> Custom Error Handling for Kinesis Polling Consumer 
> ---
>
> Key: FLINK-33991
> URL: https://issues.apache.org/jira/browse/FLINK-33991
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Emre Kartoglu
>Priority: Major
>
> We introduced custom error handling for the Kinesis EFO Consumer as part of 
> https://issues.apache.org/jira/browse/FLINK-33260
> PR for the EFO consumer: 
> [https://github.com/apache/flink-connector-aws/pull/110]
>  
> This ticket is to apply the same logic to the Kinesis Polling Consumer in the 
> same codebase.
> Current configuration for the EFO consumer looks like:
> {code:java}
> flink.shard.consumer.error.recoverable[0].exception=java.net.UnknownHostException
> flink.shard.consumer.error.recoverable[1].exception=java.net.SocketTimeoutException
>  {code}
> We should re-use the same code for the polling consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33991) Custom Error Handling for Kinesis Polling Consumer

2024-01-04 Thread Emre Kartoglu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emre Kartoglu updated FLINK-33991:
--
Description: 
We introduced custom error handling for the Kinesis EFO Consumer as part of 
https://issues.apache.org/jira/browse/FLINK-33260

PR for the EFO consumer: 
[https://github.com/apache/flink-connector-aws/pull/110]

 

This ticket is to apply the same logic to the Kinesis Polling Consumer in the 
same codebase.

Current configuration for the EFO consumer looks like:
{code:java}
flink.shard.consumer.error.recoverable[0].exception=java.net.UnknownHostException
flink.shard.consumer.error.recoverable[1].exception=java.net.SocketTimeoutException
 {code}
We should re-use the same code for the polling consumer.}}{}}}

  was:
We introduced custom error handling for the Kinesis EFO Consumer as part of 
https://issues.apache.org/jira/browse/FLINK-33260

PR for the EFO consumer: https://github.com/apache/flink-connector-aws/pull/110

 

This ticket is to apply the same logic to the Kinesis Polling Consumer in the 
same codebase.

Current configuration for the EFO consumer looks like:

{{}}
{code:java}
flink.shard.consumer.error.recoverable[0].exception=java.net.UnknownHostException
flink.shard.consumer.error.recoverable[1].exception=java.net.SocketTimeoutException
 {code}
{{}}

We should re-use the same code for the polling consumer.{{{}{}}}


> Custom Error Handling for Kinesis Polling Consumer 
> ---
>
> Key: FLINK-33991
> URL: https://issues.apache.org/jira/browse/FLINK-33991
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Emre Kartoglu
>Priority: Major
>
> We introduced custom error handling for the Kinesis EFO Consumer as part of 
> https://issues.apache.org/jira/browse/FLINK-33260
> PR for the EFO consumer: 
> [https://github.com/apache/flink-connector-aws/pull/110]
>  
> This ticket is to apply the same logic to the Kinesis Polling Consumer in the 
> same codebase.
> Current configuration for the EFO consumer looks like:
> {code:java}
> flink.shard.consumer.error.recoverable[0].exception=java.net.UnknownHostException
> flink.shard.consumer.error.recoverable[1].exception=java.net.SocketTimeoutException
>  {code}
> We should re-use the same code for the polling consumer.}}{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >