[jira] [Commented] (FLINK-32783) Release Testing: Improve parallel download of RocksDB incremental state

2023-09-05 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761982#comment-17761982
 ] 

Konstantin Knauf commented on FLINK-32783:
--

I don't think we need to test it after talking to [~srichter]. The correctness 
is verified by our nightly tests. If someone wants to verify the performance 
improvements, we could leave it open, but I don't think this is strictly 
necessary.

> Release Testing: Improve parallel download of RocksDB incremental state
> ---
>
> Key: FLINK-32783
> URL: https://issues.apache.org/jira/browse/FLINK-32783
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> This feature is automatically used whenever we download state during a 
> restart from a RocksDB incremental checkpoint. This should be tested with and 
> without task-local recovery.
> Will be covered by the nightly tests:
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
> parallelism change) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 
> rocks true true" "skip_check_exceptions"
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
> up) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 
> rocks true true" "skip_check_exceptions"
> * run_test "Resuming Externalized Checkpoint (rocks, incremental, scale 
> down) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 
> rocks true true" "skip_check_exceptions"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32782) Release Testing: Disable WAL in RocksDBWriteBatchWrapper by default

2023-09-05 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761981#comment-17761981
 ] 

Konstantin Knauf commented on FLINK-32782:
--

I don't think we need to test it after talking to [~srichter]. The correctness 
is verified by our nightly tests. If someone wants to verify the performance 
improvements, we could leave it open, but I don't think this is strictly 
necessary.

> Release Testing:  Disable WAL in RocksDBWriteBatchWrapper by default
> 
>
> Key: FLINK-32782
> URL: https://issues.apache.org/jira/browse/FLINK-32782
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
>
> Covered by nightly tests, for example 
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, no 
> parallelism change) end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 
> rocks true true" "skip_check_exceptions"
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) 
> end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 
> rocks true true" "skip_check_exceptions"
> - run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) 
> end-to-end test" 
> "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 
> rocks true true" "skip_check_exceptions"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-21450) Add local recovery support to adaptive scheduler

2023-08-07 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751793#comment-17751793
 ] 

Konstantin Knauf commented on FLINK-21450:
--

[~roman] I think, we still need to update the documentation/limitations on 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/,
 right?

> Add local recovery support to adaptive scheduler
> 
>
> Key: FLINK-21450
> URL: https://issues.apache.org/jira/browse/FLINK-21450
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Robert Metzger
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available
> Fix For: 1.18.0
>
>
> local recovery means that, on a failure, we are able to re-use the state in a 
> taskmanager, instead of loading it again from distributed storage (which 
> means the scheduler needs to know where which state is located, and schedule 
> tasks accordingly).
> Adaptive Scheduler is currently not respecting the location of state, so 
> failures require the re-loading of state from the distributed storage.
> Adding this feature will allow us to enable the {{Local recovery and sticky 
> scheduling end-to-end test}} for adaptive scheduler again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-15736) Support Java 17 (LTS)

2023-08-01 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17749535#comment-17749535
 ] 

Konstantin Knauf commented on FLINK-15736:
--

How do users usually learn about support for new Java versions?

> Support Java 17 (LTS)
> -
>
> Key: FLINK-15736
> URL: https://issues.apache.org/jira/browse/FLINK-15736
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> stale-assigned
> Fix For: 1.18.0
>
>
> Long-term issue for preparing Flink for Java 17.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32706) Add SPLIT(STRING) support in SQL & Table API

2023-08-01 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-32706:


Assignee: Hanyu Zheng

> Add SPLIT(STRING) support in SQL & Table API
> 
>
> Key: FLINK-32706
> URL: https://issues.apache.org/jira/browse/FLINK-32706
> Project: Flink
>  Issue Type: Improvement
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
>
> SPLIT Function
> Description
> Splits a string into an array of substrings, based on a delimiter.
> Syntax
> The syntax for the SPLIT function is:
> {code:java}
> SPLIT(col1, delimiter){code}
> Splits a string into an array of substrings based on a delimiter. If the 
> delimiter is not found, then the original string is returned as the only 
> element in the array. If the delimiter is empty, then all characters in the 
> string are split. If either, string or delimiter, are NULL, then a NULL value 
> is returned.
> If the delimiter is found at the beginning or end of the string, or there are 
> contiguous delimiters, then an empty space is added to the array.
> Example
> Let's look at some  function examples and explore how to use the SPLIT 
> function.
> For example:
>  
> {code:java}
> SELECT SPLIT('abcdefg', 'c');
> Result: ['ab', 'defg']
> {code}
> see also:
> 1. ksqlDB Split function
> ksqlDB provides a scalar function named {{SPLIT}} which splits a string into 
> an array of substrings based on a delimiter.
> Syntax: {{SPLIT(string, delimiter)}}
> For example: {{SPLIT('a,b,c', ',')}} will return {{{}['a', 'b', 'c']{}}}.
> [https://docs.ksqldb.io/en/0.8.1-ksqldb/developer-guide/ksqldb-reference/scalar-functions/#split]
> 2. Apache Hive Split function
> Hive offers a function named {{split}} which splits a string around a 
> specified delimiter and returns an array of strings.
> Syntax: {{array split(string str, string pat)}}
> For example: {{split('a,b,c', ',')}} will return {{{}["a", "b", "c"]{}}}.
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
> 3. Spark SQL Split function
> Spark SQL also offers a function named {{{}split{}}}, similar to the one in 
> Hive.
> Syntax: {{split(str, pattern[, limit])}}
> Here, {{limit}} is an optional parameter to specify the maximum length of the 
> returned array.
> For example: {{split('oneAtwoBthreeC', '[ABC]', 2)}} will return {{{}["one", 
> "twoBthreeC"]{}}}.
> [https://spark.apache.org/docs/latest/api/sql/index.html#split]
> 4. Presto Split function
> Presto offers a function named {{split}} which splits a string around a 
> regular expression and returns an array of strings.
> Syntax: {{array split(string str, string regex)}}
> For example: {{split('a.b.c', '\.')}} will return {{{}["a", "b", "c"]{}}}.
> [https://prestodb.io/docs/current/functions/string.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28045) [umbrella] Deprecate SourceFunction API

2023-07-26 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747334#comment-17747334
 ] 

Konstantin Knauf commented on FLINK-28045:
--

[~airblader] What we still need to do, though, is actually aligning on the list 
of these issues/blockers.

> [umbrella] Deprecate SourceFunction API
> ---
>
> Key: FLINK-28045
> URL: https://issues.apache.org/jira/browse/FLINK-28045
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32402) FLIP-294: Support Customized Catalog Modification Listener

2023-07-26 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747331#comment-17747331
 ] 

Konstantin Knauf commented on FLINK-32402:
--

[~zjureel] Great. There is a PR that adds documentation for the pluggable error 
classification under Deployment -> Advanced 
(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced).
 Would this be a good place?

> FLIP-294: Support Customized Catalog Modification Listener
> --
>
> Key: FLINK-32402
> URL: https://issues.apache.org/jira/browse/FLINK-32402
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Issue for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31634) FLIP-301: Hybrid Shuffle supports Remote Storage

2023-07-26 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747330#comment-17747330
 ] 

Konstantin Knauf commented on FLINK-31634:
--

Thanks, [~tanyuxin].

> FLIP-301: Hybrid Shuffle supports Remote Storage
> 
>
> Key: FLINK-31634
> URL: https://issues.apache.org/jira/browse/FLINK-31634
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: Umbrella
>
> This is an umbrella ticket for 
> [FLIP-301|https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31889) Add documentation for implementing/loading enrichers

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-31889:


Assignee: Konstantin Knauf  (was: Panagiotis Garefalakis)

> Add documentation for implementing/loading enrichers
> 
>
> Key: FLINK-31889
> URL: https://issues.apache.org/jira/browse/FLINK-31889
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Assignee: Konstantin Knauf
>Priority: Major
>  Labels: pull-request-available
>
> Describe how enrichers can be implemented and loaded to Flink as part of 
> documentation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31889) Add documentation for implementing/loading enrichers

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-31889:


Assignee: Panagiotis Garefalakis  (was: Konstantin Knauf)

> Add documentation for implementing/loading enrichers
> 
>
> Key: FLINK-31889
> URL: https://issues.apache.org/jira/browse/FLINK-31889
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>
> Describe how enrichers can be implemented and loaded to Flink as part of 
> documentation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31889) Add documentation for implementing/loading enrichers

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-31889:


Assignee: Panagiotis Garefalakis

> Add documentation for implementing/loading enrichers
> 
>
> Key: FLINK-31889
> URL: https://issues.apache.org/jira/browse/FLINK-31889
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>
> Describe how enrichers can be implemented and loaded to Flink as part of 
> documentation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-15736) Support Java 17 (LTS)

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746950#comment-17746950
 ] 

Konstantin Knauf commented on FLINK-15736:
--

[~chesnay]Do we need any user-facing documentation for this? I guess it would 
make sense somewhere, no?

> Support Java 17 (LTS)
> -
>
> Key: FLINK-15736
> URL: https://issues.apache.org/jira/browse/FLINK-15736
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> stale-assigned
> Fix For: 1.18.0
>
>
> Long-term issue for preparing Flink for Java 17.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29344) Make Adaptive Scheduler supports Fine-Grained Resource Management

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-29344:
-
Fix Version/s: (was: 1.18.0)

> Make Adaptive Scheduler supports Fine-Grained Resource Management
> -
>
> Key: FLINK-29344
> URL: https://issues.apache.org/jira/browse/FLINK-29344
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Xintong Song
>Assignee: Chesnay Schepler
>Priority: Major
>
> This ticket is a reflection of the following Slack discussion:
> {quote}
> Donatien Schmitz
> Adaptive Scheduler thread:
> Hey all, it seems like the Adaptive Scheduler does not support fine grain 
> resource management. I have fixed it and would like to know if you would be 
> interested in a PR or if it was purposely designed to not support Fine grain 
> resource management.
> rmetzger
> @Donatien Schmitz: I’m concerned that we don’t have a lot of review capacity 
> right now, and I’m now aware of any users asking for it.
> rmetzger
> I couldn’t find a ticket for adding this feature, did you find one?
> If not, can you add one? This will allow us to at least making this feature 
> show up on google, and people might comment on it, if they need it.
> rmetzger
> If the change is fairly self-contained, is unlikely to cause instabilities, 
> then we can also consider merging it
> rmetzger
> @Xintong Song what do you think?
> Xintong Song
> @rmetzger, thanks for involving me.
> @Donatien Schmitz, thanks for bringing this up, and for volunteering on 
> fixing this. Could you explain a bit more about how do you plan to fix this?
> Fine-grained resource management is not yet supported by adaptive scheduler, 
> because there’s an issue that we haven’t find a good solution for. Namely, if 
> only part of the resource requirements can be fulfilled, how do we decide 
> which requirements should be fulfilled. E.g., say the job declares it needs 
> 10 slots with resource 1 for map tasks, and another 10 slots with resource 2 
> for reduce tasks. If there’s not enough resources (say only 10 slots can be 
> allocated for simplicity), how many slots for map / reduce tasks should be 
> allocated? Obviously, <10 map, 0 reduce> & <0 map, 10 reduce> would not work. 
> For this example, a proportional scale-down (<5 map, 5 reduce>) seems 
> reasonable. However, a proportional scale-down is not always easy (e.g., 
> requirements is <100 map, 1 reduce>), and the issue grows more complicated if 
> you take lots of stages and the differences of slot sizes into consideration.
> I’d like to see adaptive scheduler also supports fine-grained resource 
> management. If there’s a good solution to the above issue, I’d love to help 
> review the effort.
> Donatien Schmitz
> Dear Robert and Xintong, thanks for reading and reacting to my message! I'll 
> reply tomorrow (GTM +1 time) if that's quite alright with you. Best, Donatien 
> Schmitz
> Donatien Schmitz
> @Xintong Song
> * We are working on fine-grain scheduling for resource optimisation of long 
> running or periodic jobs. One of the feature we are experiencing is a 
> "rescheduling plan", a mapping of operators and Resource Profiles that can be 
> dynamically applied to a running job. This rescheduling would be triggered by 
> policies about some metrics (focus on RocksDB in our case).
> * While developing this new feature, we decided to implement it on the 
> Adpative Scheduler instead of the Base Scheduler because the logic brought by 
> the state machine already present made it more logical: transitions from 
> states Executing -> Cancelling -> Rescheduling -> Waiting for Resources -> 
> Creating -> Executing
> * In our case we are working on a POC and thus focusing on a real simple job 
> with a // of 1. The issue you brought is indeed something we have faced while 
> raising the // of the job.
> * If you create a Jira Ticket we can discuss it over there if you'd like!
> Donatien Schmitz
> @rmetzger The changes do not break the default resource management but does 
> not fix the issue brought out by Xintong.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29344) Make Adaptive Scheduler supports Fine-Grained Resource Management

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746947#comment-17746947
 ] 

Konstantin Knauf commented on FLINK-29344:
--

Since the feature freeze has passed, I will mark this as Won't Do for Flink 
1.18 in the Wiki and remove the fixVersion. Thanks, Konstantin (one of the 
release managers for Flink 1.18).

> Make Adaptive Scheduler supports Fine-Grained Resource Management
> -
>
> Key: FLINK-29344
> URL: https://issues.apache.org/jira/browse/FLINK-29344
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Xintong Song
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.18.0
>
>
> This ticket is a reflection of the following Slack discussion:
> {quote}
> Donatien Schmitz
> Adaptive Scheduler thread:
> Hey all, it seems like the Adaptive Scheduler does not support fine grain 
> resource management. I have fixed it and would like to know if you would be 
> interested in a PR or if it was purposely designed to not support Fine grain 
> resource management.
> rmetzger
> @Donatien Schmitz: I’m concerned that we don’t have a lot of review capacity 
> right now, and I’m now aware of any users asking for it.
> rmetzger
> I couldn’t find a ticket for adding this feature, did you find one?
> If not, can you add one? This will allow us to at least making this feature 
> show up on google, and people might comment on it, if they need it.
> rmetzger
> If the change is fairly self-contained, is unlikely to cause instabilities, 
> then we can also consider merging it
> rmetzger
> @Xintong Song what do you think?
> Xintong Song
> @rmetzger, thanks for involving me.
> @Donatien Schmitz, thanks for bringing this up, and for volunteering on 
> fixing this. Could you explain a bit more about how do you plan to fix this?
> Fine-grained resource management is not yet supported by adaptive scheduler, 
> because there’s an issue that we haven’t find a good solution for. Namely, if 
> only part of the resource requirements can be fulfilled, how do we decide 
> which requirements should be fulfilled. E.g., say the job declares it needs 
> 10 slots with resource 1 for map tasks, and another 10 slots with resource 2 
> for reduce tasks. If there’s not enough resources (say only 10 slots can be 
> allocated for simplicity), how many slots for map / reduce tasks should be 
> allocated? Obviously, <10 map, 0 reduce> & <0 map, 10 reduce> would not work. 
> For this example, a proportional scale-down (<5 map, 5 reduce>) seems 
> reasonable. However, a proportional scale-down is not always easy (e.g., 
> requirements is <100 map, 1 reduce>), and the issue grows more complicated if 
> you take lots of stages and the differences of slot sizes into consideration.
> I’d like to see adaptive scheduler also supports fine-grained resource 
> management. If there’s a good solution to the above issue, I’d love to help 
> review the effort.
> Donatien Schmitz
> Dear Robert and Xintong, thanks for reading and reacting to my message! I'll 
> reply tomorrow (GTM +1 time) if that's quite alright with you. Best, Donatien 
> Schmitz
> Donatien Schmitz
> @Xintong Song
> * We are working on fine-grain scheduling for resource optimisation of long 
> running or periodic jobs. One of the feature we are experiencing is a 
> "rescheduling plan", a mapping of operators and Resource Profiles that can be 
> dynamically applied to a running job. This rescheduling would be triggered by 
> policies about some metrics (focus on RocksDB in our case).
> * While developing this new feature, we decided to implement it on the 
> Adpative Scheduler instead of the Base Scheduler because the logic brought by 
> the state machine already present made it more logical: transitions from 
> states Executing -> Cancelling -> Rescheduling -> Waiting for Resources -> 
> Creating -> Executing
> * In our case we are working on a POC and thus focusing on a real simple job 
> with a // of 1. The issue you brought is indeed something we have faced while 
> raising the // of the job.
> * If you create a Jira Ticket we can discuss it over there if you'd like!
> Donatien Schmitz
> @rmetzger The changes do not break the default resource management but does 
> not fix the issue brought out by Xintong.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32671) Document Externalized Declarative Resource Management

2023-07-25 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-32671:


 Summary: Document Externalized Declarative Resource Management
 Key: FLINK-32671
 URL: https://issues.apache.org/jira/browse/FLINK-32671
 Project: Flink
  Issue Type: Sub-task
Reporter: Konstantin Knauf






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31316) FLIP-291: Externalized Declarative Resource Management

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-31316:


Assignee: David Morávek  (was: Konstantin Knauf)

> FLIP-291: Externalized Declarative Resource Management
> --
>
> Key: FLINK-31316
> URL: https://issues.apache.org/jira/browse/FLINK-31316
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination, Runtime / REST
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
> Fix For: 1.18.0
>
>
> This is an umbrella ticket for 
> [FLIP-291|https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31316) FLIP-291: Externalized Declarative Resource Management

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-31316:


Assignee: Konstantin Knauf  (was: David Morávek)

> FLIP-291: Externalized Declarative Resource Management
> --
>
> Key: FLINK-31316
> URL: https://issues.apache.org/jira/browse/FLINK-31316
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination, Runtime / REST
>Reporter: David Morávek
>Assignee: Konstantin Knauf
>Priority: Major
> Fix For: 1.18.0
>
>
> This is an umbrella ticket for 
> [FLIP-291|https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32486) FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746942#comment-17746942
 ] 

Konstantin Knauf commented on FLINK-32486:
--

[~wanglijie] Tan Is there user facing documentation for this already? If not, 
is there a sub-task that tracks this?



> FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
> ---
>
> Key: FLINK-32486
> URL: https://issues.apache.org/jira/browse/FLINK-32486
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
> Fix For: 1.18.0
>
>
> This is an umbrella ticket for 
> [FLIP-324|https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32548) Make watermark alignment ready for production use

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746938#comment-17746938
 ] 

Konstantin Knauf commented on FLINK-32548:
--

[~fanrui] In the documentation 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_)
 this is still marked as "Beta"? Do you want to change this for Flink 1.18? Is 
there any other documentation work needed? Thanks, Konstantin (one of the 
release managers for Flink 1.18)

> Make watermark alignment ready for production use
> -
>
> Key: FLINK-32548
> URL: https://issues.apache.org/jira/browse/FLINK-32548
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> We found a series of watermark alignment bugs and performance issues and hope 
> to reach production availability in 1.18.0. 
> And fixes all bugs found in 1.16.3 and 1.17.2.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31634) FLIP-301: Hybrid Shuffle supports Remote Storage

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746941#comment-17746941
 ] 

Konstantin Knauf commented on FLINK-31634:
--

[~tanyuxin] Is there user facing documentation for this already? If not, is 
there a sub-task that tracks this?

> FLIP-301: Hybrid Shuffle supports Remote Storage
> 
>
> Key: FLINK-31634
> URL: https://issues.apache.org/jira/browse/FLINK-31634
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: Umbrella
>
> This is an umbrella ticket for 
> [FLIP-301|https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746936#comment-17746936
 ] 

Konstantin Knauf commented on FLINK-31275:
--

I will mark this as "Not finished" for Flink 1.18 and remove the fixVersion 
from the ticket as the feature freeze has passed. Thanks, Konstantin (one of 
the release managers for Flink 1.18)

> Flink supports reporting and storage of source/sink tables relationship
> ---
>
> Key: FLINK-31275
> URL: https://issues.apache.org/jira/browse/FLINK-31275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> Currently flink generates job id in `JobGraph` which can identify a job. On 
> the other hand, flink create source/sink table in planner. We need to create 
> relations between source and sink tables for the job with an identifier id



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32402) FLIP-294: Support Customized Catalog Modification Listener

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746935#comment-17746935
 ] 

Konstantin Knauf commented on FLINK-32402:
--

[~zjureel]Does this feature require any documentation for other implementers? 
How do people discover this feature? Thanks, Konstantin (one of the release 
managers for Flink 1.18)

> FLIP-294: Support Customized Catalog Modification Listener
> --
>
> Key: FLINK-32402
> URL: https://issues.apache.org/jira/browse/FLINK-32402
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Issue for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32427) FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746934#comment-17746934
 ] 

Konstantin Knauf commented on FLINK-32427:
--

What is the status of this feature in Flink 1.18? Is it usable or partially 
usable? If yes, is there documentation yet?

> FLIP-295: Support lazy initialization of catalogs and persistence of catalog 
> configurations
> ---
>
> Key: FLINK-32427
> URL: https://issues.apache.org/jira/browse/FLINK-32427
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Feng Jin
>Assignee: Feng Jin
>Priority: Major
> Fix For: 1.18.0
>
>
> Umbrella issue for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32472) FLIP-308: Support Time Travel

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-32472:
-
Fix Version/s: 1.18.0

> FLIP-308: Support Time Travel
> -
>
> Key: FLINK-32472
> URL: https://issues.apache.org/jira/browse/FLINK-32472
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Feng Jin
>Assignee: Feng Jin
>Priority: Major
> Fix For: 1.18.0
>
>
> Umbrella issue for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32472) FLIP-308: Support Time Travel

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746927#comment-17746927
 ] 

Konstantin Knauf commented on FLINK-32472:
--

[~hackergin] I will mark this feature as finished for Flink 1.18 and assign the 
fixVersion accordingly. If this is indeed not done for Flink 1.18, please let 
me know.



> FLIP-308: Support Time Travel
> -
>
> Key: FLINK-32472
> URL: https://issues.apache.org/jira/browse/FLINK-32472
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Feng Jin
>Assignee: Feng Jin
>Priority: Major
>
> Umbrella issue for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32515) FLIP-303: Support REPLACE TABLE AS SELECT statement

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746925#comment-17746925
 ] 

Konstantin Knauf commented on FLINK-32515:
--

[~luoyuxia] I will mark this feature as finished for Flink 1.18 and assign the 
fixVersion accordingly. If this is indeed not done for Flink 1.18, please let 
me know.

> FLIP-303: Support REPLACE TABLE AS SELECT statement
> ---
>
> Key: FLINK-32515
> URL: https://issues.apache.org/jira/browse/FLINK-32515
> Project: Flink
>  Issue Type: New Feature
>Reporter: luoyuxia
>Priority: Major
>
> Umbrella issue for 
> [FLIP-303](https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32515) FLIP-303: Support REPLACE TABLE AS SELECT statement

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-32515:
-
Fix Version/s: 1.18.0

> FLIP-303: Support REPLACE TABLE AS SELECT statement
> ---
>
> Key: FLINK-32515
> URL: https://issues.apache.org/jira/browse/FLINK-32515
> Project: Flink
>  Issue Type: New Feature
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.18.0
>
>
> Umbrella issue for 
> [FLIP-303](https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31487) Add targetColumns to DynamicTableSink#Context

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746922#comment-17746922
 ] 

Konstantin Knauf commented on FLINK-31487:
--

[~lincoln.86xy] Is this feature already documented for users/developers? Should 
it be documented beyond JavaDocs?

> Add targetColumns to DynamicTableSink#Context
> -
>
> Key: FLINK-31487
> URL: https://issues.apache.org/jira/browse/FLINK-31487
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null 
> overwrite problem of partial-insert
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31833) Support fusion codegen for multiple operators

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746921#comment-17746921
 ] 

Konstantin Knauf commented on FLINK-31833:
--

[~lsy] Does this feature provide a user-facing change in Flink 1.18? If so, is 
it already documented?

> Support fusion codegen for multiple operators
> -
>
> Key: FLINK-31833
> URL: https://issues.apache.org/jira/browse/FLINK-31833
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: Godfrey He
>Priority: Major
>
> Please see FLIP-315 for more detail: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26372) Allow to configure Changelog Storage per program

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26372:
-
Fix Version/s: (was: 1.18.0)

> Allow to configure Changelog Storage per program
> 
>
> Key: FLINK-26372
> URL: https://issues.apache.org/jira/browse/FLINK-26372
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Configuration, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Roman Khachatryan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>
> It's currently possible to override state.backend.changelog.enabled per job, 
> but it's not possible to override Changelog storage settings (i.e. writer 
> settings).
> There should be 1) an API and 2) runtime support for that.
> See this 
> [discussion|https://github.com/apache/flink/pull/16341#discussion_r663749681] 
> and the corresponding 
> [TODO|https://github.com/apache/flink/pull/16341/files#diff-2c21555dcab689ec27c0ab981852a2bfa787695fb2fe04b24c22b89c63d98b73R680].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26372) Allow to configure Changelog Storage per program

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746920#comment-17746920
 ] 

Konstantin Knauf commented on FLINK-26372:
--

I will mark this as "Not finished" for Flink 1.18 and remove the fixVersion 
from the ticket as the feature freeze has passed. Thanks, Konstantin (one of 
the release managers for Flink 1.18)


> Allow to configure Changelog Storage per program
> 
>
> Key: FLINK-26372
> URL: https://issues.apache.org/jira/browse/FLINK-26372
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Configuration, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Roman Khachatryan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> It's currently possible to override state.backend.changelog.enabled per job, 
> but it's not possible to override Changelog storage settings (i.e. writer 
> settings).
> There should be 1) an API and 2) runtime support for that.
> See this 
> [discussion|https://github.com/apache/flink/pull/16341#discussion_r663749681] 
> and the corresponding 
> [TODO|https://github.com/apache/flink/pull/16341/files#diff-2c21555dcab689ec27c0ab981852a2bfa787695fb2fe04b24c22b89c63d98b73R680].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30235) Comprehensive benchmarks on changelog checkpointing

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746918#comment-17746918
 ] 

Konstantin Knauf commented on FLINK-30235:
--

I will mark this as "Not finished" for Flink 1.18 and remove the fixVersion 
from the ticket as the feature freeze has passed. Thanks, Konstantin (one of 
the release managers for Flink 1.18)



> Comprehensive benchmarks on changelog checkpointing
> ---
>
> Key: FLINK-30235
> URL: https://issues.apache.org/jira/browse/FLINK-30235
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Rui Xia
>Priority: Minor
>  Labels: performance
>
> Changelog checkpointing is functionally usable right now. To make it as a 
> productive feature, more comprehensive benchmarks are required. In this 
> issue, I aim to answer the following two major concerns:
>  * The expansion of full checkpoint size caused by changelog persistence;
>  * The TPS regression caused by DTSL double-write;
> By the way, I will also present other metrics related to checkpointing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29844) FLIP-263: Improve resolving schema compatibility

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-29844:
-
Fix Version/s: (was: 1.18.0)

> FLIP-263: Improve resolving schema compatibility
> 
>
> Key: FLINK-29844
> URL: https://issues.apache.org/jira/browse/FLINK-29844
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>
> Details can be seen in
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-263%3A+Improve+resolving+schema+compatibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29844) FLIP-263: Improve resolving schema compatibility

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746917#comment-17746917
 ] 

Konstantin Knauf commented on FLINK-29844:
--

I will mark this as "Not finished" for Flink 1.18 and remove the fixVersion 
from the ticket as the feature freeze has passed. Thanks, Konstantin (one of 
the release managers for Flink 1.18)

> FLIP-263: Improve resolving schema compatibility
> 
>
> Key: FLINK-29844
> URL: https://issues.apache.org/jira/browse/FLINK-29844
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
> Fix For: 1.18.0
>
>
> Details can be seen in
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-263%3A+Improve+resolving+schema+compatibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2023-07-25 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746916#comment-17746916
 ] 

Konstantin Knauf commented on FLINK-32070:
--

I will mark this as Won't Do in Flink 1.18 release page and update the 
fixVersion as it still has many open subtasks.

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.18.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2023-07-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-32070:
-
Fix Version/s: (was: 1.18.0)

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29344) Make Adaptive Scheduler supports Fine-Grained Resource Management

2023-06-28 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-29344:


Assignee: Chesnay Schepler

> Make Adaptive Scheduler supports Fine-Grained Resource Management
> -
>
> Key: FLINK-29344
> URL: https://issues.apache.org/jira/browse/FLINK-29344
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Xintong Song
>Assignee: Chesnay Schepler
>Priority: Major
>
> This ticket is a reflection of the following Slack discussion:
> {quote}
> Donatien Schmitz
> Adaptive Scheduler thread:
> Hey all, it seems like the Adaptive Scheduler does not support fine grain 
> resource management. I have fixed it and would like to know if you would be 
> interested in a PR or if it was purposely designed to not support Fine grain 
> resource management.
> rmetzger
> @Donatien Schmitz: I’m concerned that we don’t have a lot of review capacity 
> right now, and I’m now aware of any users asking for it.
> rmetzger
> I couldn’t find a ticket for adding this feature, did you find one?
> If not, can you add one? This will allow us to at least making this feature 
> show up on google, and people might comment on it, if they need it.
> rmetzger
> If the change is fairly self-contained, is unlikely to cause instabilities, 
> then we can also consider merging it
> rmetzger
> @Xintong Song what do you think?
> Xintong Song
> @rmetzger, thanks for involving me.
> @Donatien Schmitz, thanks for bringing this up, and for volunteering on 
> fixing this. Could you explain a bit more about how do you plan to fix this?
> Fine-grained resource management is not yet supported by adaptive scheduler, 
> because there’s an issue that we haven’t find a good solution for. Namely, if 
> only part of the resource requirements can be fulfilled, how do we decide 
> which requirements should be fulfilled. E.g., say the job declares it needs 
> 10 slots with resource 1 for map tasks, and another 10 slots with resource 2 
> for reduce tasks. If there’s not enough resources (say only 10 slots can be 
> allocated for simplicity), how many slots for map / reduce tasks should be 
> allocated? Obviously, <10 map, 0 reduce> & <0 map, 10 reduce> would not work. 
> For this example, a proportional scale-down (<5 map, 5 reduce>) seems 
> reasonable. However, a proportional scale-down is not always easy (e.g., 
> requirements is <100 map, 1 reduce>), and the issue grows more complicated if 
> you take lots of stages and the differences of slot sizes into consideration.
> I’d like to see adaptive scheduler also supports fine-grained resource 
> management. If there’s a good solution to the above issue, I’d love to help 
> review the effort.
> Donatien Schmitz
> Dear Robert and Xintong, thanks for reading and reacting to my message! I'll 
> reply tomorrow (GTM +1 time) if that's quite alright with you. Best, Donatien 
> Schmitz
> Donatien Schmitz
> @Xintong Song
> * We are working on fine-grain scheduling for resource optimisation of long 
> running or periodic jobs. One of the feature we are experiencing is a 
> "rescheduling plan", a mapping of operators and Resource Profiles that can be 
> dynamically applied to a running job. This rescheduling would be triggered by 
> policies about some metrics (focus on RocksDB in our case).
> * While developing this new feature, we decided to implement it on the 
> Adpative Scheduler instead of the Base Scheduler because the logic brought by 
> the state machine already present made it more logical: transitions from 
> states Executing -> Cancelling -> Rescheduling -> Waiting for Resources -> 
> Creating -> Executing
> * In our case we are working on a POC and thus focusing on a real simple job 
> with a // of 1. The issue you brought is indeed something we have faced while 
> raising the // of the job.
> * If you create a Jira Ticket we can discuss it over there if you'd like!
> Donatien Schmitz
> @rmetzger The changes do not break the default resource management but does 
> not fix the issue brought out by Xintong.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32468) Replace Akka by Pekko

2023-06-28 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-32468:


 Summary: Replace Akka by Pekko
 Key: FLINK-32468
 URL: https://issues.apache.org/jira/browse/FLINK-32468
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Konstantin Knauf
 Fix For: 1.18.0


Akka 2.6.x will not receive security fixes from September 2023 onwards (see 
https://discuss.lightbend.com/t/2-6-x-maintenance-proposal/9949). 

A mid-term plan to replace Akka is described in FLINK-29281. In the meantime, 
we suggest to replace Akka by Apache Pekko (incubating), which is a fork of 
Akka 2.6.x under the Apache 2.0 license. This way - if needed - we at least 
have the ability to release security fixes ourselves in collaboration with the 
Pekko community. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29281) Replace Akka by gRPC-based RPC implementation

2023-06-28 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-29281:
-
Summary: Replace Akka by gRPC-based RPC implementation  (was: Replace Akka)

> Replace Akka by gRPC-based RPC implementation
> -
>
> Key: FLINK-29281
> URL: https://issues.apache.org/jira/browse/FLINK-29281
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / RPC
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Following the license change I propose to eventually replace Akka.
> Based on LEGAL-619 an exemption is not feasible, and while a fork _may_ be 
> created it's long-term future is up in the air and I'd be uncomfortable with 
> relying on it.
> I've been experimenting with a new RPC implementation based on gRPC and so 
> far I'm quite optimistic. It's also based on Netty while not requiring as 
> much of a tight coupling as Akka did.
> This would also allow us to sidestep migrating our current Akka setup from 
> Netty 3 (which is affected by several CVEs) to Akka Artery, both saving work 
> and not introducing an entirely different network stack to the project.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31609) Fatal error in ResourceManager caused YARNSessionFIFOSecuredITCase.testDetachedMode to fail

2023-04-18 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713474#comment-17713474
 ] 

Konstantin Knauf commented on FLINK-31609:
--

[~ferenc-csaky] Hi Ferenc, do you or someone in your team have time to look 
into this?

> Fatal error in ResourceManager caused 
> YARNSessionFIFOSecuredITCase.testDetachedMode to fail
> ---
>
> Key: FLINK-31609
> URL: https://issues.apache.org/jira/browse/FLINK-31609
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> This looks like FLINK-30908. I created a follow-up ticket because we reached 
> a new minor version.
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47547=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461
> {code}
> Mar 24 09:32:29 2023-03-24 09:31:50,001 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - 
> Exception on heartbeat
> Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send 
> RPC request to server
> Mar 24 09:32:29 java.io.InterruptedIOException: Interrupted waiting to send 
> RPC request to server
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1461) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1403) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at com.sun.proxy.$Proxy33.allocate(Unknown Source) 
> ~[?:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
>  ~[hadoop-yarn-common-2.10.2.jar:?]
> Mar 24 09:32:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_292]
> Mar 24 09:32:29   at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362)
>  ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at com.sun.proxy.$Proxy34.allocate(Unknown Source) 
> ~[?:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:297)
>  ~[hadoop-yarn-client-2.10.2.jar:?]
> Mar 24 09:32:29   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:274)
>  [hadoop-yarn-client-2.10.2.jar:?]
> Mar 24 09:32:29 Caused by: java.lang.InterruptedException
> Mar 24 09:32:29   at 
> java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_292]
> Mar 24 09:32:29   at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1177) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   at org.apache.hadoop.ipc.Client.call(Client.java:1456) 
> ~[hadoop-common-2.10.2.jar:?]
> Mar 24 09:32:29   ... 17 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30116) Don't Show Env Vars in Web UI

2022-11-21 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-30116:


 Summary: Don't Show Env Vars in Web UI
 Key: FLINK-30116
 URL: https://issues.apache.org/jira/browse/FLINK-30116
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.16.0
Reporter: Konstantin Knauf
 Fix For: 1.16.1


As discussed and agreed upon in [1], we'd like to revert [2] and not show any 
environment variables in the Web UI for security reasons. 

[1] https://lists.apache.org/thread/rjgk15bqttvblp60zry4n5pw4xjw7q9k 
[2] https://issues.apache.org/jira/browse/FLINK-28311



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29850) Flink Table Store quick start guide does not work

2022-11-02 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17627911#comment-17627911
 ] 

Konstantin Knauf commented on FLINK-29850:
--

I can reproduce this.

> Flink Table Store quick start guide does not work
> -
>
> Key: FLINK-29850
> URL: https://issues.apache.org/jira/browse/FLINK-29850
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Alex Sorokoumov
>Priority: Major
>
> Following instructions in 
> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/try-table-store/quick-start/
>  leads to empty results in {{word_count}} table.
> Flink version 1.15.2, Flink Table Store version 0.2.1.
> {noformat}
> Flink SQL> show catalogs;
> +-+
> |catalog name |
> +-+
> | default_catalog |
> +-+
> 1 row in set
> Flink SQL> CREATE CATALOG my_catalog WITH (
> >   'type'='table-store',
> >   'warehouse'='file:/tmp/table_store'
> > );
> >
> [INFO] Execute statement succeed.
> Flink SQL> USE CATALOG my_catalog;
> > USE CATALOG my_catalog;
> >
> Flink SQL> USE CATALOG my_catalog;
> >
> [INFO] Execute statement succeed.
> Flink SQL> CREATE TABLE word_count (
> > word STRING PRIMARY KEY NOT ENFORCED,
> > cnt BIGINT
> > );
> >
> [INFO] Execute statement succeed.
> Flink SQL> CREATE TEMPORARY TABLE word_table (
> > word STRING
> > ) WITH (
> > 'connector' = 'datagen',
> > 'fields.word.length' = '1'
> > );
> [INFO] Execute statement succeed.
> Flink SQL> SET 'execution.checkpointing.interval' = '10 s';
> >
> [INFO] Session property has been set.
> Flink SQL> INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP 
> BY word;
> >
> [INFO] Submitting SQL update statement to the cluster...
> [INFO] SQL update statement has been successfully submitted to the cluster:
> Job ID: 0c5f22c2ab3e83e1a1f9274818ff675b
> Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
> >
> [INFO] Session property has been set.
> Flink SQL> RESET 'execution.checkpointing.interval';
> >
> [INFO] Session property has been reset.
> Flink SQL> SET 'execution.runtime-mode' = 'batch';
> >
> [INFO] Session property has been set.
> Flink SQL> SELECT * FROM word_count;
> >
> Empty set
> {noformat}
> Flink logs:
> {noformat}
> flink  | Starting standalonesession as a console application on host 
> flink.
> broker | [2022-11-02 14:07:17,045] INFO [Controller id=1] Processing 
> automatic preferred replica leader election (kafka.controller.KafkaController)
> broker | [2022-11-02 14:07:17,046] TRACE [Controller id=1] Checking 
> need to trigger auto leader balancing (kafka.controller.KafkaController)
> broker | [2022-11-02 14:07:17,050] DEBUG [Controller id=1] Topics not 
> in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
> broker | [2022-11-02 14:07:17,051] TRACE [Controller id=1] Leader 
> imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
> flink  | 2022-11-02 14:07:17,745 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
> 
> flink  | 2022-11-02 14:07:17,752 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  
> Preconfiguration:
> flink  | 2022-11-02 14:07:17,753 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> flink  |
> flink  |
> flink  | RESOURCE_PARAMS extraction logs:
> flink  | jvm_params: -Xmx1073741824 -Xms1073741824 
> -XX:MaxMetaspaceSize=268435456
> flink  | dynamic_configs: -D 
> jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=1073741824b -D 
> jobmanager.memory.jvm-overhead.max=201326592b
> flink  | logs: INFO  [] - Loading configuration property: 
> jobmanager.rpc.address, flink
> flink  | INFO  [] - Loading configuration property: 
> jobmanager.rpc.port, 6123
> flink  | INFO  [] - Loading configuration property: 
> jobmanager.bind-host, 0.0.0.0
> flink  | INFO  [] - Loading configuration property: 
> jobmanager.memory.process.size, 1600m
> flink  | INFO  [] - Loading configuration property: 
> taskmanager.bind-host, 0.0.0.0
> flink  | INFO  [] - Loading configuration property: 
> taskmanager.memory.process.size, 1728m
> flink  | INFO  [] - Loading configuration property: 
> taskmanager.numberOfTaskSlots, 1
> flink  | INFO  [] - Loading configuration property: 
> parallelism.default, 1
> flink  | INFO  [] - Loading configuration property: 
> 

[jira] [Commented] (FLINK-16073) Translate "State & Fault Tolerance" pages into Chinese

2022-10-26 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624685#comment-17624685
 ] 

Konstantin Knauf commented on FLINK-16073:
--

[~fuping.wang] Done

> Translate "State & Fault Tolerance" pages into Chinese
> --
>
> Key: FLINK-16073
> URL: https://issues.apache.org/jira/browse/FLINK-16073
> Project: Flink
>  Issue Type: New Feature
>  Components: chinese-translation, Documentation
>Affects Versions: 1.11.0
>Reporter: Yu Li
>Assignee: fuping.wang
>Priority: Major
>  Labels: auto-unassigned
> Fix For: 1.17.0
>
>
> Translate all "State & Fault Tolerance" related pages into Chinese, including 
> pages under `docs/dev/stream/state/` and `docs/ops/state`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-16073) Translate "State & Fault Tolerance" pages into Chinese

2022-10-26 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-16073:


Assignee: fuping.wang

> Translate "State & Fault Tolerance" pages into Chinese
> --
>
> Key: FLINK-16073
> URL: https://issues.apache.org/jira/browse/FLINK-16073
> Project: Flink
>  Issue Type: New Feature
>  Components: chinese-translation, Documentation
>Affects Versions: 1.11.0
>Reporter: Yu Li
>Assignee: fuping.wang
>Priority: Major
>  Labels: auto-unassigned
> Fix For: 1.17.0
>
>
> Translate all "State & Fault Tolerance" related pages into Chinese, including 
> pages under `docs/dev/stream/state/` and `docs/ops/state`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27748) AdaptiveScheduler should support operator fixed parallelism

2022-09-02 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17599355#comment-17599355
 ] 

Konstantin Knauf commented on FLINK-27748:
--

[~long jiang] Better late than never. You can also set the maxParallelism per 
Operator and that will be respect by reative mode.

> AdaptiveScheduler should support operator fixed parallelism
> ---
>
> Key: FLINK-27748
> URL: https://issues.apache.org/jira/browse/FLINK-27748
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: john
>Priority: Minor
>
> In the job topology, if the user specifies the concurrency of the operator, 
> AdaptiveScheduler should support the operator's maximum parallelism equal to 
> the user-specified parallelism during the scheduling process. And the minimum 
> parallelism is equal to the number of slots available to the cluster. 
> This is especially useful in certain scenarios,
> For example, the parallelism of an operator that consumes Kafka is specified 
> to be equal to the number of partitions. Or you want to control the write 
> rate of the operator, etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration

2022-08-15 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579681#comment-17579681
 ] 

Konstantin Knauf commented on FLINK-19589:
--

In terms of the scope of this ticket, I think, it would be good to solve this 

a) for all filesystems (at least Hadoop & Presto S3, Azure, HDFS?)
b) so that different configurations can be applied to each source/sink operator 
(not only only per job). The configuration in the flink-conf.yaml would act as 
a default for connectors and would be used by the runtime itself (HA, 
Checkpointing). 

The implementation can happen in multiple iterations, but should follow a 
common strategy otherwise I fear we'll build multiple island solutions that 
don't really go together well. 

[~jmahonin] Do you think your approach could be extended to cover all 
filesystems? What alternatives are there?
[~jmahonin] Does you approach support different configurations within the same 
Job?

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28045) [umbrella] Deprecate SourceFunction API

2022-06-17 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1761#comment-1761
 ] 

Konstantin Knauf commented on FLINK-28045:
--

Are all of these subtasks considered blockers for the deprecation or is there a 
distincation between "Must Have" and "Nice-to-Have"?

> [umbrella] Deprecate SourceFunction API
> ---
>
> Key: FLINK-28045
> URL: https://issues.apache.org/jira/browse/FLINK-28045
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Alexander Fedulov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28051) Introduce stable Source API alternative to ExternallyInducedSource

2022-06-17 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1759#comment-1759
 ] 

Konstantin Knauf commented on FLINK-28051:
--

Is it confirmed an alternative is needed? I am personally only aware of Pravega 
as a user and not sure they still use it. 

> Introduce stable Source API alternative to ExternallyInducedSource
> --
>
> Key: FLINK-28051
> URL: https://issues.apache.org/jira/browse/FLINK-28051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Tests
>Reporter: Alexander Fedulov
>Priority: Major
>
> It needs to be evaluated if ExternallyInducedSourceReader can be promoted.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27748) AdaptiveScheduler should support operator fixed parallelism

2022-05-30 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17543896#comment-17543896
 ] 

Konstantin Knauf commented on FLINK-27748:
--

IIRC the adaptive scheduler (or at least reactive mode) already takes the max 
parallelism for an operator into account. Could you use maxParallelism to 
specify an upper bound?

cc [~chesnay] 

> AdaptiveScheduler should support operator fixed parallelism
> ---
>
> Key: FLINK-27748
> URL: https://issues.apache.org/jira/browse/FLINK-27748
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: john
>Priority: Minor
>
> In the job topology, if the user specifies the concurrency of the operator, 
> AdaptiveScheduler should support the operator's maximum parallelism equal to 
> the user-specified parallelism during the scheduling process. And the minimum 
> parallelism is equal to the number of slots available to the cluster. 
> This is especially useful in certain scenarios,
> For example, the parallelism of an operator that consumes Kafka is specified 
> to be equal to the number of partitions. Or you want to control the write 
> rate of the operator, etc.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-26281) Test Elasticsearch connector End2End

2022-03-22 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510356#comment-17510356
 ] 

Konstantin Knauf edited comment on FLINK-26281 at 3/22/22, 9:20 AM:


I've done a couple of tests with ES7, none with ES6: DataStream & Table API, 
NONE/AT_LEAST_ONCE, Elastic Search Index, Elastic Search Data Stream [1]. 

Overall, no blockers. Here are some suggestions for improvements to the 
documentation: 
 * explain how end-to-end exactly-once can be achieved with the ES connector 
(AT_LEAST_ONCE + upserts based on a deterministic id). In my understanding it 
is not recommended anymore in Elastic to set the id manually, so this might be 
worth highlighting. 
 * remove the `type` from the IndexRequest, because it is deprected
 * Add documentation for usage of es datastreams (e.g. requires a @timestamp 
field, only OpType.CREATE, what it means for end-to-end delivery guarantees)



I don't think, I'll have the time to look into ES6. So, [~alexanderpreuss], 
I'll leave it up to you to either move this to DONE or to unassign myself. 

[1] 
[https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html]


was (Author: knaufk):
I've done a couple of tests with ES7, none with ES6: DataStream & Table API, 
NONE/AT_LEAST_ONCE, Elastic Search Index, Elastic Search Data Stream [1]. 

Overall, no blockers. Here are some suggestions for improvements to the 
documentation: 

* explain how end-to-end exactly-once can be achieved with the ES connector 
(AT_LEAST_ONCE + upserts based on a deterministic id). In my understanding it 
is not recommended anymore in Elastic to set the id manually, so this might be 
worth highlighting. 
* remove the `type` from the IndexRequest, because it is deprected
* Add documentation for usage of es datastreams (e.g. requires a @timestamp 
field, only OpType.CREATE, what it means for end-to-end delivery guarantees)

[1] 
https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html

> Test Elasticsearch connector End2End
> 
>
> Key: FLINK-26281
> URL: https://issues.apache.org/jira/browse/FLINK-26281
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.15.0
>Reporter: Alexander Preuss
>Assignee: Konstantin Knauf
>Priority: Blocker
>  Labels: pull-request-available, release-testing
> Fix For: 1.15.0
>
>
> Feature introduced in https://issues.apache.org/jira/browse/FLINK-24323
> Documentation for [datastream 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/]
> Documentation for [table 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/elasticsearch/]
> As 1.15 deprecated the SinkFunction-based Elasticsearch connector and 
> introduces the new connector based on the Sink interface we should test it 
> behaves correctly and as the user expects.
>  
> Some suggestions what to test:
>  * Test delivery guarantees (none, at-least-once) (exactly-once should not 
> run)
>  * Write a simple job that is inserting/upserting data into Elasticsearch
>  * Write a simple job that is inserting/upserting data into Elasticsearch and 
> use a non-default parallelism
>  * Write a simple job in both datastream api and table api
>  * Test restarting jobs and scaling up/down
>  * Test failure of a simple job that is inserting data with exactly-once 
> delivery guarantee by terminating and restarting Elasticsearch
>  * Test against Elasticsearch 6.X and 7.X with the respective connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26281) Test Elasticsearch connector End2End

2022-03-22 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510356#comment-17510356
 ] 

Konstantin Knauf commented on FLINK-26281:
--

I've done a couple of tests with ES7, none with ES6: DataStream & Table API, 
NONE/AT_LEAST_ONCE, Elastic Search Index, Elastic Search Data Stream [1]. 

Overall, no blockers. Here are some suggestions for improvements to the 
documentation: 

* explain how end-to-end exactly-once can be achieved with the ES connector 
(AT_LEAST_ONCE + upserts based on a deterministic id). In my understanding it 
is not recommended anymore in Elastic to set the id manually, so this might be 
worth highlighting. 
* remove the `type` from the IndexRequest, because it is deprected
* Add documentation for usage of es datastreams (e.g. requires a @timestamp 
field, only OpType.CREATE, what it means for end-to-end delivery guarantees)

[1] 
https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html

> Test Elasticsearch connector End2End
> 
>
> Key: FLINK-26281
> URL: https://issues.apache.org/jira/browse/FLINK-26281
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.15.0
>Reporter: Alexander Preuss
>Assignee: Konstantin Knauf
>Priority: Blocker
>  Labels: pull-request-available, release-testing
> Fix For: 1.15.0
>
>
> Feature introduced in https://issues.apache.org/jira/browse/FLINK-24323
> Documentation for [datastream 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/]
> Documentation for [table 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/elasticsearch/]
> As 1.15 deprecated the SinkFunction-based Elasticsearch connector and 
> introduces the new connector based on the Sink interface we should test it 
> behaves correctly and as the user expects.
>  
> Some suggestions what to test:
>  * Test delivery guarantees (none, at-least-once) (exactly-once should not 
> run)
>  * Write a simple job that is inserting/upserting data into Elasticsearch
>  * Write a simple job that is inserting/upserting data into Elasticsearch and 
> use a non-default parallelism
>  * Write a simple job in both datastream api and table api
>  * Test restarting jobs and scaling up/down
>  * Test failure of a simple job that is inserting data with exactly-once 
> delivery guarantee by terminating and restarting Elasticsearch
>  * Test against Elasticsearch 6.X and 7.X with the respective connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26759) Legacy source support waiting for recordWriter to be available

2022-03-22 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510328#comment-17510328
 ] 

Konstantin Knauf commented on FLINK-26759:
--

I am very much with [~martijnvisser] on this. One of the core issues, I think, 
has been the slow adoption of the new interfaces in the Flink codebase it self. 
This is where we should invest additional resources if we have any. We can not 
keep supporting legacy interfaces for ever, so users will have to do the switch 
eventually anyway. 

For which sources, have you seen issues when upgrading?

> Legacy source support waiting for recordWriter to be available
> --
>
> Key: FLINK-26759
> URL: https://issues.apache.org/jira/browse/FLINK-26759
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common, Runtime / Checkpointing
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: fanrui
>Priority: Major
>
> In order for Unaligned Checkpoint not to be blocked, StreamTask#processInput 
> will check recordWriter.isAvailable(). If not available, the data will not be 
> processed until recordWriter is available.
> The new Source api is compatible with the above logic, but Legacy Source is 
> not compatible with the above logic. When using Unaligned Checkpoint, if the 
> backpressure of Legacy Source is high, the Checkpoint duration of Legacy 
> Source will be very long.
>  
> Since legacy sources are often used in production, can we add logic to wait 
> for recordWriter to be available for legacy source?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure

2022-03-11 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-15550:
-
Fix Version/s: 1.14.5
   (was: 1.14.3)

> testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
> 
>
> Key: FLINK-15550
> URL: https://issues.apache.org/jira/browse/FLINK-15550
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.11.0, 1.12.5, 1.13.6, 1.14.3
>Reporter: Yun Tang
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.12.8, 1.13.6, 1.14.5
>
>
> Instance: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241=ms.vss-test-web.build-test-results-tab=12434=108939=debug
> {code:java}
> java.lang.AssertionError: expected: but was:
>   at 
> org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525)
> {code}
> {code:java}
> expected: but was:
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26049) The tolerable-failed-checkpoints logic is invalid when checkpoint trigger failed

2022-03-11 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26049:
-
Fix Version/s: 1.14.5
   (was: 1.14.4)

> The tolerable-failed-checkpoints logic is invalid when checkpoint trigger 
> failed
> 
>
> Key: FLINK-26049
> URL: https://issues.apache.org/jira/browse/FLINK-26049
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.13.5, 1.14.3
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.5
>
> Attachments: image-2022-02-09-18-08-17-868.png, 
> image-2022-02-09-18-08-34-992.png, image-2022-02-09-18-08-42-920.png, 
> image-2022-02-18-11-28-53-337.png, image-2022-02-18-11-33-28-232.png, 
> image-2022-02-18-11-44-52-745.png, image-2022-02-22-10-27-43-731.png, 
> image-2022-02-22-10-31-05-012.png
>
>
> After triggerCheckpoint, if checkpoint failed, flink will execute the 
> tolerable-failed-checkpoints logic. But if triggerCheckpoint failed, flink 
> won't execute the tolerable-failed-checkpoints logic.
> h1. How to reproduce this issue?
> In our online env, hdfs sre deletes the flink base dir by mistake, and flink 
> job don't have permission to create checkpoint dir. So cause flink trigger 
> checkpoint failed.
> There are some didn't meet expectations:
>  * JM just log _"Failed to trigger checkpoint for job 
> 6f09d4a15dad42b24d52c987f5471f18 since Trigger checkpoint failure" ._ Don't 
> show the root cause or exception.
>  * user set tolerable-failed-checkpoints=0, but if triggerCheckpoint failed, 
> flink won't execute the tolerable-failed-checkpoints logic. 
>  * When triggerCheckpoint failed, numberOfFailedCheckpoints is always 0
>  * When triggerCheckpoint failed, we can't find checkpoint info in checkpoint 
> history page.
>  
> !image-2022-02-09-18-08-17-868.png!
>  
> !image-2022-02-09-18-08-34-992.png!
> !image-2022-02-09-18-08-42-920.png!
>  
> h3. *All metrics are normal, so the next day we found out that the checkpoint 
> failed, and the checkpoint has been failing for a day. it's not acceptable to 
> the flink user.*
> I have some ideas:
>  # Should tolerable-failed-checkpoints logic be executed when 
> triggerCheckpoint fails?
>  # When triggerCheckpoint failed, should increase numberOfFailedCheckpoints?
>  # When triggerCheckpoint failed, should show checkpoint info in checkpoint 
> history page?
>  # JM just show "Failed to trigger checkpoint", should we show detailed 
> exception to easy find the root cause?
>  
> Masters, could we do these changes? Please correct me if I'm wrong.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26223) Making ZK-related logs available in tests

2022-03-11 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26223:
-
Fix Version/s: 1.14.5
   (was: 1.14.4)

> Making ZK-related logs available in tests
> -
>
> Key: FLINK-26223
> URL: https://issues.apache.org/jira/browse/FLINK-26223
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.13.6, 1.14.3
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0, 1.13.7, 1.14.5
>
>
> Recently, we had a few incidents where it appears that ZooKeeper wasn't 
> behaving as expected. It might help to have to the ZooKeeper logs available 
> in these cases.
> We have multiple options:
>  * Introduce an extension to change the ZK log level for specific tests
>  * Lower the ZK log level again and make the logs being written to the 
> standard log files
>  * Lower the ZK log level again and move the ZK logs into a dedicated file to 
> avoid spoiling the Flink logs



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26285) ZooKeeperStateHandleStore does not handle not existing nodes properly in getAllAndLock

2022-03-11 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26285:
-
Fix Version/s: 1.14.5
   (was: 1.14.4)

> ZooKeeperStateHandleStore does not handle not existing nodes properly in 
> getAllAndLock
> --
>
> Key: FLINK-26285
> URL: https://issues.apache.org/jira/browse/FLINK-26285
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.13.6, 1.14.3
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.13.7, 1.14.5
>
>
> [c3a6b514595ea3c1bf52126f6f1715b26c871ae9|https://github.com/apache/flink/commit/c3a6b514595ea3c1bf52126f6f1715b26c871ae9]
>  introduces new exceptions that are not properly handled in 
> [ZooKeeperStateHandleStore:378|https://github.com/apache/flink/blob/0cf7c3dedd3575cdfed57727e9712c28c013d7ca/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java#L378]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23944) PulsarSourceITCase.testTaskManagerFailure is instable

2022-03-11 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-23944:
-
Fix Version/s: 1.14.5
   (was: 1.14.4)

> PulsarSourceITCase.testTaskManagerFailure is instable
> -
>
> Key: FLINK-23944
> URL: https://issues.apache.org/jira/browse/FLINK-23944
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Dian Fu
>Assignee: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0, 1.14.5
>
>
> [https://dev.azure.com/dianfu/Flink/_build/results?buildId=430=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d]
> It's from my personal azure pipeline, however, I'm pretty sure that I have 
> not touched any code related to this. 
> {code:java}
> Aug 24 10:44:13 [ERROR] testTaskManagerFailure{TestEnvironment, 
> ExternalContext, ClusterControllable}[1] Time elapsed: 258.397 s <<< FAILURE! 
> Aug 24 10:44:13 java.lang.AssertionError: Aug 24 10:44:13 Aug 24 10:44:13 
> Expected: Records consumed by Flink should be identical to test data and 
> preserve the order in split Aug 24 10:44:13 but: Mismatched record at 
> position 7: Expected '0W6SzacX7MNL4xLL3BZ8C3ljho4iCydbvxIl' but was 
> 'wVi5JaJpNvgkDEOBRC775qHgw0LyRW2HBxwLmfONeEmr' Aug 24 10:44:13 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) Aug 24 10:44:13 
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) Aug 24 
> 10:44:13 at 
> org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase.testTaskManagerFailure(SourceTestSuiteBase.java:271)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26524) Elasticsearch (v5.3.3) sink end-to-end test

2022-03-11 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26524:
-
Fix Version/s: 1.14.5
   (was: 1.14.4)

> Elasticsearch (v5.3.3) sink end-to-end test
> ---
>
> Key: FLINK-26524
> URL: https://issues.apache.org/jira/browse/FLINK-26524
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.14.3
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.14.5
>
>
> e2e test {{Elasticsearch (v5.3.3) sink end-to-end test}} failed in [this 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32627=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=16598]
>  on {{release-1.14}} probably because of the following stacktrace showing up 
> in the logs:
> {code}
> Mar 07 15:40:41 2022-03-07 15:40:40,336 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
> trigger checkpoint 1 for job 3a2fd4c6fb03d5b20929a6f2b7131d82. (0 consecutive 
> failed attempts so far)
> Mar 07 15:40:41 org.apache.flink.runtime.checkpoint.CheckpointException: 
> Checkpoint was declined (task is closing)
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:988)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_322]
> Mar 07 15:40:41   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_322]
> Mar 07 15:40:41   at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
> Mar 07 15:40:41 Caused by: org.apache.flink.util.SerializedThrowable: Task 
> name with subtask : Source: Sequence Source (Deprecated) -> Flat Map -> Sink: 
> Unnamed (1/1)#0 Failure reason: Checkpoint was declined (task is closing)
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1389) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1382) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1348)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:956)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> Mar 07 15:40:41   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) ~[?:1.8.0_322]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26281) Test Elasticsearch connector End2End

2022-03-04 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501330#comment-17501330
 ] 

Konstantin Knauf commented on FLINK-26281:
--

[~alexanderpreuss] Is it important that the failure for exactly-once delivery 
guarantees is introduced via terminating ElasticSearch or could it also be 
introduced by killing a Taskmanager?

> Test Elasticsearch connector End2End
> 
>
> Key: FLINK-26281
> URL: https://issues.apache.org/jira/browse/FLINK-26281
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.15.0
>Reporter: Alexander Preuss
>Assignee: Konstantin Knauf
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> Feature introduced in https://issues.apache.org/jira/browse/FLINK-24323
> Documentation for [datastream 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/]
> Documentation for [table 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/elasticsearch/]
> As 1.15 deprecated the SinkFunction-based Elasticsearch connector and 
> introduces the new connector based on the Sink interface we should test it 
> behaves correctly and as the user expects.
>  
> Some suggestions what to test:
>  * Test delivery guarantees (none, at-least-once) (exactly-once should not 
> run)
>  * Write a simple job that is inserting/upserting data into Elasticsearch
>  * Write a simple job that is inserting/upserting data into Elasticsearch and 
> use a non-default parallelism
>  * Write a simple job in both datastream api and table api
>  * Test restarting jobs and scaling up/down
>  * Test failure of a simple job that is inserting data with exactly-once 
> delivery guarantee by terminating and restarting Elasticsearch
>  * Test against Elasticsearch 6.X and 7.X with the respective connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25682) Check in script to verify cheksums and signatures for Apache Flink Releases

2022-03-02 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25682:
-
Description: 
For each flink, flink-shaded and flink-ml release, multiple community memebers 
usually check the correctness of signatures and checksums. I propose to check 
in a script for this purpose.

What the script should do:
 * Verify Signatures for Java & Python Binaries and Maven Artifacts
 * Verify Checksums (MD5 & SHA1/512) for Java & Python Binaries

Where should this script be located:
 * flink: [https://github.com/apache/flink/tree/master/tools/releasing]
 * flink-shaded: 
[https://github.com/apache/flink-shaded/tree/master/tools/releasing]
 * flink-ml: [https://github.com/apache/flink-ml/tree/master/tools/releasing]

  was:
For each flink, flink-shaded and flink-ml release, multiple community memebers 
usually check the correctness of signatures and checksums. I propose to check 
in a script for this purpose. 

What the script should do:

* Verify Signatures for Java & Python Binaries and Maven Artifacts
* Verify Checksums (MD5 & SHA1/512) for Java & Python Binaries and Maven 
Artifacts

Where should this script be located: 
* flink: https://github.com/apache/flink/tree/master/tools/releasing
* flink-shaded: 
https://github.com/apache/flink-shaded/tree/master/tools/releasing
* flink-ml: https://github.com/apache/flink-ml/tree/master/tools/releasing


> Check in script to verify cheksums and signatures for Apache Flink Releases
> ---
>
> Key: FLINK-25682
> URL: https://issues.apache.org/jira/browse/FLINK-25682
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>
> For each flink, flink-shaded and flink-ml release, multiple community 
> memebers usually check the correctness of signatures and checksums. I propose 
> to check in a script for this purpose.
> What the script should do:
>  * Verify Signatures for Java & Python Binaries and Maven Artifacts
>  * Verify Checksums (MD5 & SHA1/512) for Java & Python Binaries
> Where should this script be located:
>  * flink: [https://github.com/apache/flink/tree/master/tools/releasing]
>  * flink-shaded: 
> [https://github.com/apache/flink-shaded/tree/master/tools/releasing]
>  * flink-ml: [https://github.com/apache/flink-ml/tree/master/tools/releasing]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26273) Test checkpoints restore modes & formats

2022-03-02 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497438#comment-17497438
 ] 

Konstantin Knauf edited comment on FLINK-26273 at 3/2/22, 12:45 PM:


[~dwysakowicz] Looks very good. Here's what I did. Did I miss anything from 
your perspective? I opened two tickets (linked to this ticket) with CLI related 
issues. I also opened a hotfix PR with some documentation improvements: 
[https://github.com/apache/flink/pull/18909]

*Commit:* 0ff6f2cc78f

1. *Taking Canonical/Native Savepoints/Retained Checkpoint*

Ran TopSpeedWindowing in Standalone Application Mode with RocksDB (incremental) 
and checkpoints retained on cancellation three times:
 * stopped with native savepoint (Savepoint ID: savepoint-b6dea9-ec57dcec988e)
 * stopped with canonical savepoint (Savepoint ID: 
savepoint-0d0bb8-3cfceefe4dec)
 * cancelled (JobID: c40b0839cfa6a454919597819e8e84f6)

Checkpoint Directory
{noformat}
/tmp/flink-checkpoints
├── 0d0bb8faccf2eb8124d086a5355428a8
│   ├── shared
│   └── taskowned
├── b6dea9642f5159f83c32eca3fc40082a
│   ├── shared
│   └── taskowned
└── c40b0839cfa6a454919597819e8e84f6
├── chk-13
│   └── _metadata
├── shared
│   └── 1d438c44-c7a6-49c0-8053-1e5689a6df5c
└── taskowned
{noformat}
Savepoint Directory
{noformat}
/tmp/flink-savepoints
├── savepoint-0d0bb8-3cfceefe4dec
│   └── _metadata
└── savepoint-b6dea9-ec57dcec988e
├── dd200786-54e3-4af3-a6f4-2943ff73bc14
└── _metadata
{noformat}
2. *Two Jobs can be Started from Native Savepoint without Claiming and take a 
full checkpoint*

Started 2 TopSpeedWindowing Jobs (aca6b1fc37c489d608b8ab9d562cd569 & 
634d99afcf280d7e6eefd7d9f2b0ec37) without claiming from Native Savepoint and 
confirmed that a full snapshot was taken for both of them (I took the fact that 
the "Checkpointed Data Size"="Full Checkpoint Data Size" for the first 
checkpoint only as sign that this is the case.). Cancelled both jobs.

3. *Two Jobs can be Started from Retained Checkpoint without Claiming and take 
a full checkpoint*

Like Step 2a just using the retained checkpoint from Step 1 instead of native 
savepoint.

4. *Job can claim retained checkpoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Started TopSpeedWindowing with Claiming from the Retained Checkpoint of Step 1. 
Confirmed that the first Checkpoint is incremental and confirmed that the 
original checkpoint directory is empty after a few checkpoints.
{code:bash}
/tmp/flink-checkpoints/c40b0839cfa6a454919597819e8e84f6
├── shared
└── taskowned
{code}
4. *Job can claim moved, native savepoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Copied Native Savepoint from Step 1 to a different directory. Everything else 
like in 3. The directory of the moved Savepoint does not exist after a few 
checkpoints and the first checkpoint is incremental.

5. *Native Savepoint can be removed after first successful checkpoint and 
recovery still works*

Started TopSpeedWindowing from Native Savepoint. After one checkpoint, removed 
savepoint, killed Taskmanager, restarted Taskmanager and Job recovered and 
continued checkpointing.


was (Author: knaufk):
[~dwysakowicz] Looks very good. Here's what I did. Did I miss anything from 
your perspective? I opened two tickets (linked to this ticket) with CLI related 
issues. I also opened a hotfix PR with some documentation improvements: 
https://github.com/apache/flink/pull/18909

*Commit:* 0ff6f2cc78f

1. *Taking Canonical/Native Savepoints/Retained Checkpoint*

Ran TopSpeedWindowing in Standalone Application Mode with RocksDB (incremental) 
and checkpoints retained on cancellation three times: 
* stopped with native savepoint (Savepoint ID: savepoint-b6dea9-ec57dcec988e)
* stopped with canonical savepoint (Savepoint ID: savepoint-0d0bb8-3cfceefe4dec)
* cancelled (JobID: c40b0839cfa6a454919597819e8e84f6) 

Checkpoint Directory
{noformat}
/tmp/flink-checkpoints
├── 0d0bb8faccf2eb8124d086a5355428a8
│   ├── shared
│   └── taskowned
├── b6dea9642f5159f83c32eca3fc40082a
│   ├── shared
│   └── taskowned
└── c40b0839cfa6a454919597819e8e84f6
├── chk-13
│   └── _metadata
├── shared
│   └── 1d438c44-c7a6-49c0-8053-1e5689a6df5c
└── taskowned
{noformat}

Savepoint Directory


{noformat}
/tmp/flink-savepoints
├── savepoint-0d0bb8-3cfceefe4dec
│   └── _metadata
└── savepoint-b6dea9-ec57dcec988e
├── dd200786-54e3-4af3-a6f4-2943ff73bc14
└── _metadata
{noformat}

2. *Two Jobs can be Started from Native Savepoint without Claiming and take a 
full checkpoint*

Started 2 TopSpeedWindowing Jobs (aca6b1fc37c489d608b8ab9d562cd569 & 
634d99afcf280d7e6eefd7d9f2b0ec37) without claiming from Native Savepoint and 
confirmed that a full snapshot was taken for both of them (I took the fact that 
the "Checkpointed Data Size"="Full Checkpoint Data Size" for 

[jira] [Resolved] (FLINK-26273) Test checkpoints restore modes & formats

2022-03-02 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf resolved FLINK-26273.
--
Resolution: Done

> Test checkpoints restore modes & formats
> 
>
> Key: FLINK-26273
> URL: https://issues.apache.org/jira/browse/FLINK-26273
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Dawid Wysakowicz
>Assignee: Konstantin Knauf
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> We should test manually changes introduced in [FLINK-25276] & [FLINK-25154]
> Proposal: 
> Take canonical savepoint/native savepoint/externalised checkpoint (with 
> RocksDB), and perform claim (1)/no claim (2) recoveries, and verify that in:
> # after a couple of checkpoints claimed files have been cleaned up
> # that after a single successful checkpoint, you can remove the start up 
> files and failover the job without any errors.
> # take a native, incremental RocksDB savepoint, move to a different 
> directory, restore from it
> documentation:
> # 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#restore-mode
> # 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#savepoint-format



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26446) Update Feature Radar in Apache Flink Roadmap

2022-03-02 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-26446:


 Summary: Update Feature Radar in Apache Flink Roadmap 
 Key: FLINK-26446
 URL: https://issues.apache.org/jira/browse/FLINK-26446
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Konstantin Knauf
 Fix For: 1.15.0


Deployment/Coordination:
 * Java 8 -> Deprecation
 * Add Deployment Modes
 ** Application Mode -> Stable
 ** Session Mode -> Stable
 ** Per-Job Mode -> Deprecated
 * Adaptive Scheduler -> Ready & Evolving



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24376) Operator name in OperatorCoordinator should not use chained name

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-24376:
-
Fix Version/s: (was: 1.14.4)

> Operator name in OperatorCoordinator should not use chained name
> 
>
> Key: FLINK-24376
> URL: https://issues.apache.org/jira/browse/FLINK-24376
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.12.5, 1.13.2
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0, 1.14.5
>
>
> Currently the operator name passed to 
> {{CoordinatedOperatorFactory#getCoordinatorProvider}} is a chained operator 
> name (e.g. Source -> Map) instead of the name of coordinating operator, which 
> might be misleading. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24735) SQL client crashes with `Cannot add expression of different type to set`

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-24735:
-
Fix Version/s: (was: 1.14.4)

> SQL client crashes with `Cannot add expression of different type to set`
> 
>
> Key: FLINK-24735
> URL: https://issues.apache.org/jira/browse/FLINK-24735
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.14.0
>Reporter: Martijn Visser
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.15.0, 1.14.5
>
>
> Reproductions steps:
> 1. Download airports.csv from https://www.kaggle.com/usdot/flight-delays
> 2. Start Flink SQL client and create table
> {code:sql}
> CREATE TABLE `airports` (
>   `IATA_CODE` CHAR(3),
>   `AIRPORT` STRING,
>   `CITY` STRING,
>   `STATE` CHAR(2),
>   `COUNTRY` CHAR(3),
>   `LATITUDE` DOUBLE NULL,
>   `LONGITUDE` DOUBLE NULL,
>   PRIMARY KEY (`IATA_CODE`) NOT ENFORCED
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 
> 'file:///flink-sql-cookbook/other-builtin-functions/04_override_table_options/airports.csv',
>   'format' = 'csv'
> );
> {code}
> 3. Run the following SQL statement:
> {code:sql}
> SELECT * FROM `airports` /*+ OPTIONS('csv.ignore-parse-errors'='true') */ 
> WHERE COALESCE(`IATA_CODE`, `AIRPORT`) IS NULL;
> {code}
> Stacktrace:
> {code:bash}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>   at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
>   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: java.lang.AssertionError: Cannot add expression of different type 
> to set:
> set type is RecordType(CHAR(3) CHARACTER SET "UTF-16LE" NOT NULL IATA_CODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AIRPORT, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" CITY, CHAR(2) CHARACTER SET "UTF-16LE" STATE, 
> CHAR(3) CHARACTER SET "UTF-16LE" COUNTRY, DOUBLE LATITUDE, DOUBLE LONGITUDE) 
> NOT NULL
> expression type is RecordType(CHAR(3) CHARACTER SET "UTF-16LE" IATA_CODE, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AIRPORT, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" CITY, CHAR(2) CHARACTER SET "UTF-16LE" STATE, 
> CHAR(3) CHARACTER SET "UTF-16LE" COUNTRY, DOUBLE LATITUDE, DOUBLE LONGITUDE) 
> NOT NULL
> set is rel#426:LogicalProject.NONE.any.None: 
> 0.[NONE].[NONE](input=HepRelVertex#425,inputs=0..6)
> expression is LogicalProject(IATA_CODE=[null:CHAR(3) CHARACTER SET 
> "UTF-16LE"], AIRPORT=[$1], CITY=[$2], STATE=[$3], COUNTRY=[$4], 
> LATITUDE=[$5], LONGITUDE=[$6])
>   LogicalFilter(condition=[IS NULL(CAST($0):VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE")])
> LogicalTableScan(table=[[default_catalog, default_database, airports]], 
> hints=[[[OPTIONS inheritPath:[] options:{csv.ignore-parse-errors=true}]]])
>   at 
> org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:381)
>   at 
> org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:58)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283)
>   at 
> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:310)
>   at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>   at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>   at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>   at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>   at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>   at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>   at scala.collection.Iterator.foreach(Iterator.scala:937)
>   at scala.collection.Iterator.foreach$(Iterator.scala:937)
>   at 

[jira] [Updated] (FLINK-25227) Comparing the equality of the same (boxed) numeric values returns false

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25227:
-
Fix Version/s: (was: 1.14.4)

> Comparing the equality of the same (boxed) numeric values returns false
> ---
>
> Key: FLINK-25227
> URL: https://issues.apache.org/jira/browse/FLINK-25227
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0, 1.12.5, 1.13.3
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.7, 1.14.5
>
>
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
> bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   val data = Seq(
> Row.of(
>   java.lang.Integer.valueOf(1000),
>   java.lang.Integer.valueOf(2000),
>   java.lang.Integer.valueOf(1000),
>   java.lang.Integer.valueOf(2000))
>   )
>   tEnv.executeSql(
> s"""
>|create table T (
>|  a int,
>|  b int,
>|  c int,
>|  d int
>|) with (
>|  'connector' = 'values',
>|  'bounded' = 'true',
>|  'data-id' = '${TestValuesTableFactory.registerData(data)}'
>|)
>|""".stripMargin)
>   tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
> }
> {code}
> The result is false, which is obviously incorrect.
> This is caused by the generated java code:
> {code:java}
> public class StreamExecCalc$8 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> private final Object[] references;
> org.apache.flink.table.data.BoxedWrapperRowData out =
> new org.apache.flink.table.data.BoxedWrapperRowData(1);
> private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
> new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> public StreamExecCalc$8(
> Object[] references,
> org.apache.flink.streaming.runtime.tasks.StreamTask task,
> org.apache.flink.streaming.api.graph.StreamConfig config,
> org.apache.flink.streaming.api.operators.Output output,
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService)
> throws Exception {
> this.references = references;
> this.setup(task, config, output);
> if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> 
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> .setProcessingTimeService(processingTimeService);
> }
> }
> @Override
> public void open() throws Exception {
> super.open();
> }
> @Override
> public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element)
> throws Exception {
> org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
> int field$0;
> boolean isNull$0;
> int field$1;
> boolean isNull$1;
> int field$3;
> boolean isNull$3;
> int field$4;
> boolean isNull$4;
> boolean isNull$6;
> boolean result$7;
> isNull$3 = in1.isNullAt(2);
> field$3 = -1;
> if (!isNull$3) {
> field$3 = in1.getInt(2);
> }
> isNull$0 = in1.isNullAt(0);
> field$0 = -1;
> if (!isNull$0) {
> field$0 = in1.getInt(0);
> }
> isNull$1 = in1.isNullAt(1);
> field$1 = -1;
> if (!isNull$1) {
> field$1 = in1.getInt(1);
> }
> isNull$4 = in1.isNullAt(3);
> field$4 = -1;
> if (!isNull$4) {
> field$4 = in1.getInt(3);
> }
> out.setRowKind(in1.getRowKind());
> java.lang.Integer result$2 = field$0;
> boolean nullTerm$2 = false;
> if (!nullTerm$2) {
> java.lang.Integer cur$2 = field$0;
> if (isNull$0) {
> nullTerm$2 = true;
> } else {
> int compareResult = result$2.compareTo(cur$2);
> if ((true && compareResult < 0) || (compareResult > 0 && 
> !true)) {
> result$2 = cur$2;
> }
> }
> }
> if (!nullTerm$2) {
> java.lang.Integer cur$2 = field$1;
> if (isNull$1) {
> nullTerm$2 = true;
> } else {
> int compareResult = 

[jira] [Updated] (FLINK-23955) submit flink sql job error when flink HA on yarn is configured

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-23955:
-
Fix Version/s: (was: 1.14.4)

> submit flink sql job error when flink HA on yarn is configured
> --
>
> Key: FLINK-23955
> URL: https://issues.apache.org/jira/browse/FLINK-23955
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Jun Zhang
>Priority: Major
> Fix For: 1.15.0, 1.14.5
>
>
> 1.when I configured the flink HA ,like this
> {code:java}
> high-availability: zookeeper
> high-availability.storageDir: hdfs://xxx/flink/ha/
> high-availability.zookeeper.quorum: x:2181
> high-availability.zookeeper.path.root: /flink
> {code}
> 2. I start a flink session cluster
> 3. I submit a flink sql job by flink sql client and set the 
> {code:java}
> set execution.target = yarn-per-job;
> {code}
> then I get the error
>  
>  
> {code:java}
> 2021-08-25 10:40:39,500 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>  [] - Found Web Interface master3:38052 of application 
> 'application_1629858010528_0002'.
> 2021-08-25 10:40:42,447 WARN  
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error.,  org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (2cc726b9ae70c95f128f6c1e55cf874c)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:878)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:892)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:712)
> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)End
>  of exception on server side>]
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> ~[?:1.8.0_291]
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 
> ~[?:1.8.0_291]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> 

[jira] [Updated] (FLINK-25444) ClosureCleaner rejects ExecutionConfig as not be Serializable due to TextElement

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25444:
-
Fix Version/s: (was: 1.14.4)

> ClosureCleaner rejects ExecutionConfig as not be Serializable due to 
> TextElement
> 
>
> Key: FLINK-25444
> URL: https://issues.apache.org/jira/browse/FLINK-25444
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
>Reporter: Wen Zhou
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.15.0, 1.14.5
>
>
> This should be a bug introduced by the latest flink commit of file 
> [flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java|https://github.com/apache/flink/commit/9e0e0929b86c372c9243daad9d654af9e7718708#diff-7a439abdf207cf6da8aa6c147b38c1346820fe786afbf652bc614fc377cdf238]
> Diff the file, we can find TextElement is used here where ClosureCleanerLevel 
> is is used as a memeber of Serializable ExecutionConfig.
> [TextElement in ClosureCleanerLevel|https://i.stack.imgur.com/ky3d8.png]
> The simplest way to verify the problem is running code as followings in flink 
> 1.13.5 and 1.14.x, the exception is reproduced in 1.14.x . And the diff 
> between 1.13.5 and 1.14.x is only lates commit
>  
> @Test
> public void testExecutionConfigSerializable() throws Exception {
> ExecutionConfig config = new ExecutionConfig();
> ClosureCleaner.clean(config, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, 
> true);
> }
>  
> The problem can be found here 
> [https://stackoverflow.com/questions/70443743/flink-blockelement-exception-when-updating-to-version-1-14-2/70468925?noredirect=1#comment124597510_70468925]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25771) CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25771:
-
Fix Version/s: (was: 1.14.4)

> CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP
> -
>
> Key: FLINK-25771
> URL: https://issues.apache.org/jira/browse/FLINK-25771
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.15.0, 1.13.5, 1.14.3
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0, 1.13.7, 1.14.5
>
>
> The test {{CassandraConnectorITCase.testRetrialAndDropTables}} fails on AZP 
> with
> {code}
> Jan 23 01:02:52 com.datastax.driver.core.exceptions.NoHostAvailableException: 
> All host(s) tried for query failed (tried: /172.17.0.1:59220 
> (com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1] Timed out waiting for server response))
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> Jan 23 01:02:52   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRetrialAndDropTables(CassandraConnectorITCase.java:554)
> Jan 23 01:02:52   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jan 23 01:02:52   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jan 23 01:02:52   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jan 23 01:02:52   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:196)
> Jan 23 01:02:52   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jan 23 01:02:52   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> Jan 23 01:02:52   at org.junit.runners.Suite.runChild(Suite.java:128)
> Jan 23 01:02:52   at 

[jira] [Updated] (FLINK-26018) Unnecessary late events when using the new KafkaSource

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26018:
-
Fix Version/s: (was: 1.14.4)

> Unnecessary late events when using the new KafkaSource
> --
>
> Key: FLINK-26018
> URL: https://issues.apache.org/jira/browse/FLINK-26018
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.3
>Reporter: Jun Qin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.5
>
> Attachments: message in kafka.txt, 
> taskmanager_10.28.0.131_33249-b3370c_log
>
>
> There is an issue with the new KafkaSource connector in Flink 1.14: when one 
> task consumes messages from multiple topic partitions (statically created, 
> timestamp are in order), it may start with one partition and advances 
> watermarks before the data from other partitions come. In this case, the 
> early messages in other partitions may unnecessarily be considered  as late 
> ones.
> I discussed with [~renqs], it seems that the new KafkaSource only adds a 
> partition into {{WatermarkMultiplexer}} when it receives data from that 
> partition. In contrast, FlinkKafkaConsumer adds all known partition before it 
> fetch any data. 
> Attached two files: the messages in Kafka and the corresponding TM logs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-21302) Fix NPE when use row_number() in over agg

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-21302:
-
Fix Version/s: (was: 1.14.4)

> Fix NPE when use row_number() in over agg
> -
>
> Key: FLINK-21302
> URL: https://issues.apache.org/jira/browse/FLINK-21302
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0, 1.14.5
>
>
> `NullPointException` would be thrown out if SQL contains row_number() in Over 
> Aggregate.
> {code:scala}
> @Test
> def testRowNumberOnOver(): Unit = {
>   val t = failingDataSource(TestData.tupleData5)
> .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
>   tEnv.registerTable("MyTable", t)
>   val sqlQuery = "SELECT a, ROW_NUMBER() OVER (PARTITION BY a ORDER BY 
> proctime()) FROM MyTable"
>   val sink = new TestingAppendSink
>   tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
>   env.execute()
> }{code}
>  
> The following exception would be thrown out.
> {code:java}
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:240) 
> at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:240) at 
> scala.collection.SeqLike$class.size(SeqLike.scala:106) at 
> scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:234) at 
> scala.collection.mutable.Builder$class.sizeHint(Builder.scala:69) at 
> scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:22) at 
> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:230) 
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at 
> scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at 
> org.apache.flink.table.planner.codegen.agg.DeclarativeAggCodeGen.(DeclarativeAggCodeGen.scala:82)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:222)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:214)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.initialAggregateInformation(AggsHandlerCodeGenerator.scala:214)
>  at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:325)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.java:262)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:154)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:128)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:65)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:128)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:167)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:136)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:128)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> 

[jira] [Updated] (FLINK-24491) ExecutionGraphInfo may not be archived when the dispatcher terminates

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-24491:
-
Fix Version/s: (was: 1.14.4)

> ExecutionGraphInfo may not be archived when the dispatcher terminates
> -
>
> Key: FLINK-24491
> URL: https://issues.apache.org/jira/browse/FLINK-24491
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.2, 1.15.0
>Reporter: Zhilong Hong
>Priority: Major
> Fix For: 1.15.0, 1.13.7, 1.14.5
>
>
> When a job finishes, its JobManagerRunnerResult will be processed in the 
> callback of {{Dispatcher#runJob}}. In the callback, ExecutionGraphInfo will 
> be archived by HistoryServerArchivist asynchronously. However, the 
> CompletableFuture of the archiving is ignored. The job may be removed before 
> the archiving is finished. For the batch job running in the 
> per-job/application mode, the dispatcher will terminate itself once the job 
> is finished. In this case, ExecutionGraphInfo may not be archived when the 
> dispatcher terminates.
> If the ExecutionGraphInfo is lost, users are not able to know whether the 
> batch job is finished normally or not. They have to refer to the logs for the 
> result.
> The session mode is not affected, since the dispatcher won't terminate itself 
> once the job is finished. The HistoryServerArchivist gets enough time to 
> archive the ExcutionGraphInfo.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24483) Document what is Public API and what compatibility guarantees Flink is providing

2022-02-25 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-24483:
-
Fix Version/s: (was: 1.14.4)

> Document what is Public API and what compatibility guarantees Flink is 
> providing
> 
>
> Key: FLINK-24483
> URL: https://issues.apache.org/jira/browse/FLINK-24483
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Documentation, Table SQL / API
>Affects Versions: 1.14.0, 1.12.5, 1.13.2
>Reporter: Piotr Nowojski
>Priority: Major
> Fix For: 1.15.0, 1.13.7, 1.14.5
>
>
> We should document:
> * What constitute of the Public API, what do 
> Public/PublicEvolving/Experimental/Internal annotations mean.
> * What compatibility guarantees we are providing forward (backward?) 
> functional/compile/binary compatibility for {{@Public}} interfaces?
> A good starting point: 
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=44302796#content/view/62686683



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26354) "-restoreMode" should be "--restoreMode" and should have a shorthand

2022-02-24 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497959#comment-17497959
 ] 

Konstantin Knauf commented on FLINK-26354:
--

>From my perspective consistency with existing options trumps the other 
>arguments here. So far, we (almost) always have a short option and a long 
>option. So that's what we should do here and in 
>https://issues.apache.org/jira/browse/FLINK-26353 as well.

> "-restoreMode" should be "--restoreMode" and should have a shorthand
> 
>
> Key: FLINK-26354
> URL: https://issues.apache.org/jira/browse/FLINK-26354
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.15.0
>Reporter: Konstantin Knauf
>Priority: Minor
>
> {code:java}
> -restoreMode  Defines how should we restore
> from the given savepoint.
> Supported options: [claim -
> claim ownership of the 
> savepoint
> and delete once it is 
> subsumed,
> no_claim (default) - do not
> claim ownership, the first
> checkpoint will not reuse any
> files from the restored one,
> legacy - the old behaviour, do
> not assume ownership of the
> savepoint files, but can reuse
> some shared files.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26354) "-restoreMode" should be "--restoreMode" and should have a shorthand

2022-02-24 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497509#comment-17497509
 ] 

Konstantin Knauf commented on FLINK-26354:
--

In the discussion linked I don't understand why the short option was dropped in 
the end. If "-r" is not an option, we could also go for "-rm" or so ans the 
short option. 

> "-restoreMode" should be "--restoreMode" and should have a shorthand
> 
>
> Key: FLINK-26354
> URL: https://issues.apache.org/jira/browse/FLINK-26354
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.15.0
>Reporter: Konstantin Knauf
>Priority: Minor
>
> {code:java}
> -restoreMode  Defines how should we restore
> from the given savepoint.
> Supported options: [claim -
> claim ownership of the 
> savepoint
> and delete once it is 
> subsumed,
> no_claim (default) - do not
> claim ownership, the first
> checkpoint will not reuse any
> files from the restored one,
> legacy - the old behaviour, do
> not assume ownership of the
> savepoint files, but can reuse
> some shared files.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26273) Test checkpoints restore modes & formats

2022-02-24 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497438#comment-17497438
 ] 

Konstantin Knauf edited comment on FLINK-26273 at 2/24/22, 2:38 PM:


[~dwysakowicz] Looks very good. Here's what I did. Did I miss anything from 
your perspective? I opened two tickets (linked to this ticket) with CLI related 
issues. I also opened a hotfix PR with some documentation improvements: 
https://github.com/apache/flink/pull/18909

*Commit:* 0ff6f2cc78f

1. *Taking Canonical/Native Savepoints/Retained Checkpoint*

Ran TopSpeedWindowing in Standalone Application Mode with RocksDB (incremental) 
and checkpoints retained on cancellation three times: 
* stopped with native savepoint (Savepoint ID: savepoint-b6dea9-ec57dcec988e)
* stopped with canonical savepoint (Savepoint ID: savepoint-0d0bb8-3cfceefe4dec)
* cancelled (JobID: c40b0839cfa6a454919597819e8e84f6) 

Checkpoint Directory
{noformat}
/tmp/flink-checkpoints
├── 0d0bb8faccf2eb8124d086a5355428a8
│   ├── shared
│   └── taskowned
├── b6dea9642f5159f83c32eca3fc40082a
│   ├── shared
│   └── taskowned
└── c40b0839cfa6a454919597819e8e84f6
├── chk-13
│   └── _metadata
├── shared
│   └── 1d438c44-c7a6-49c0-8053-1e5689a6df5c
└── taskowned
{noformat}

Savepoint Directory


{noformat}
/tmp/flink-savepoints
├── savepoint-0d0bb8-3cfceefe4dec
│   └── _metadata
└── savepoint-b6dea9-ec57dcec988e
├── dd200786-54e3-4af3-a6f4-2943ff73bc14
└── _metadata
{noformat}

2. *Two Jobs can be Started from Native Savepoint without Claiming and take a 
full checkpoint*

Started 2 TopSpeedWindowing Jobs (aca6b1fc37c489d608b8ab9d562cd569 & 
634d99afcf280d7e6eefd7d9f2b0ec37) without claiming from Native Savepoint and 
confirmed that a full snapshot was taken for both of them (I took the fact that 
the "Checkpointed Data Size"="Full Checkpoint Data Size" for the first 
checkpoint only as sign that this is the case.). Cancelled both jobs. 

3. *Two Jobs can be Started from Retained Checkpoint without Claiming and take 
a full checkpoint*

Like Step 2a just using the retained checkpoint from Step 1 instead of native 
savepoint.

4. *Job can be claim retained checkpoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Started TopSpeedWindowing with Claiming from the Retained Checkpoint of Step 1. 
Confirmed that the first Checkpoint is incremental and confirmed that the 
original checkpoint directory is empty after a few checkpoints. 

{code:bash}
/tmp/flink-checkpoints/c40b0839cfa6a454919597819e8e84f6
├── shared
└── taskowned
{code}

4. *Job can be claim moved, native savepoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Copied Native Savepoint from Step 1 to a different directory. Everything else 
like in 3. The directory of the moved Savepoint does not exist after a few 
checkpoints and the first checkpoint is incremental.

5. *Native Savepoint can be removed after first successful checkpoint and 
recovery still works*

Started TopSpeedWindowing from Native Savepoint. After one checkpoint, removed 
savepoint, killed Taskmanager, restarted Taskmanager and Job recovered and 
continued checkpointing. 








was (Author: knaufk):
[~dwysakowicz] Looks very good. Here's what I did. Did I miss anything from 
your perspective? I opened two tickets (linked to this ticket) with CLI related 
issues. I also opened a hotfix PR with some documentation improvements: 
https://github.com/apache/flink/pull/18909

*Commit:* 0ff6f2cc78f

1. Taking Canonical/Native Savepoints/Retained Checkpoint

Ran TopSpeedWindowing in Standalone Application Mode with RocksDB (incremental) 
and checkpoints retained on cancellation three times: 
* stopped with native savepoint (Savepoint ID: savepoint-b6dea9-ec57dcec988e)
* stopped with canonical savepoint (Savepoint ID: savepoint-0d0bb8-3cfceefe4dec)
* cancelled (JobID: c40b0839cfa6a454919597819e8e84f6) 

*Checkpoint Directory*
{noformat}
/tmp/flink-checkpoints
├── 0d0bb8faccf2eb8124d086a5355428a8
│   ├── shared
│   └── taskowned
├── b6dea9642f5159f83c32eca3fc40082a
│   ├── shared
│   └── taskowned
└── c40b0839cfa6a454919597819e8e84f6
├── chk-13
│   └── _metadata
├── shared
│   └── 1d438c44-c7a6-49c0-8053-1e5689a6df5c
└── taskowned
{noformat}

*Savepoint Directory*


{noformat}
/tmp/flink-savepoints
├── savepoint-0d0bb8-3cfceefe4dec
│   └── _metadata
└── savepoint-b6dea9-ec57dcec988e
├── dd200786-54e3-4af3-a6f4-2943ff73bc14
└── _metadata
{noformat}

2. *Two Jobs can be Started from Native Savepoint without Claiming and take a 
full checkpoint*

Started 2 TopSpeedWindowing Jobs (aca6b1fc37c489d608b8ab9d562cd569 & 
634d99afcf280d7e6eefd7d9f2b0ec37) without claiming from Native Savepoint and 
confirmed that a full snapshot was taken for both of them (I took the fact that 
the "Checkpointed Data Size"="Full 

[jira] [Commented] (FLINK-26273) Test checkpoints restore modes & formats

2022-02-24 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497438#comment-17497438
 ] 

Konstantin Knauf commented on FLINK-26273:
--

[~dwysakowicz] Looks very good. Here's what I did. Did I miss anything from 
your perspective? I opened two tickets (linked to this ticket) with CLI related 
issues. I also opened a hotfix PR with some documentation improvements: 
https://github.com/apache/flink/pull/18909

*Commit:* 0ff6f2cc78f

1. Taking Canonical/Native Savepoints/Retained Checkpoint

Ran TopSpeedWindowing in Standalone Application Mode with RocksDB (incremental) 
and checkpoints retained on cancellation three times: 
* stopped with native savepoint (Savepoint ID: savepoint-b6dea9-ec57dcec988e)
* stopped with canonical savepoint (Savepoint ID: savepoint-0d0bb8-3cfceefe4dec)
* cancelled (JobID: c40b0839cfa6a454919597819e8e84f6) 

*Checkpoint Directory*
{noformat}
/tmp/flink-checkpoints
├── 0d0bb8faccf2eb8124d086a5355428a8
│   ├── shared
│   └── taskowned
├── b6dea9642f5159f83c32eca3fc40082a
│   ├── shared
│   └── taskowned
└── c40b0839cfa6a454919597819e8e84f6
├── chk-13
│   └── _metadata
├── shared
│   └── 1d438c44-c7a6-49c0-8053-1e5689a6df5c
└── taskowned
{noformat}

*Savepoint Directory*


{noformat}
/tmp/flink-savepoints
├── savepoint-0d0bb8-3cfceefe4dec
│   └── _metadata
└── savepoint-b6dea9-ec57dcec988e
├── dd200786-54e3-4af3-a6f4-2943ff73bc14
└── _metadata
{noformat}

2. *Two Jobs can be Started from Native Savepoint without Claiming and take a 
full checkpoint*

Started 2 TopSpeedWindowing Jobs (aca6b1fc37c489d608b8ab9d562cd569 & 
634d99afcf280d7e6eefd7d9f2b0ec37) without claiming from Native Savepoint and 
confirmed that a full snapshot was taken for both of them (I took the fact that 
the "Checkpointed Data Size"="Full Checkpoint Data Size" for the first 
checkpoint only as sign that this is the case.). Cancelled both jobs. 

3. *Two Jobs can be Started from Retained Checkpoint without Claiming and take 
a full checkpoint*

Like Step 2a just using the retained checkpoint from Step 1 instead of native 
savepoint.

4. *Job can be claim retained checkpoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Started TopSpeedWindowing with Claiming from the Retained Checkpoint of Step 1. 
Confirmed that the first Checkpoint is incremental and confirmed that the 
original checkpoint directory is empty after a few checkpoints. 

{code:bash}
/tmp/flink-checkpoints/c40b0839cfa6a454919597819e8e84f6
├── shared
└── taskowned
{code}

4. *Job can be claim moved, native savepoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Copied Native Savepoint from Step 1 to a different directory. Everything else 
like in 3. The directory of the moved Savepoint does not exist after a few 
checkpoints and the first checkpoint is incremental.

5. *Native Savepoint can be removed after first successful checkpoint and 
recovery still works"

Started TopSpeedWindowing from Native Savepoint. After one checkpoint, removed 
savepoint, killed Taskmanager, restarted Taskmanager and Job recovered and 
continued checkpointing. 







> Test checkpoints restore modes & formats
> 
>
> Key: FLINK-26273
> URL: https://issues.apache.org/jira/browse/FLINK-26273
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Dawid Wysakowicz
>Assignee: Konstantin Knauf
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> We should test manually changes introduced in [FLINK-25276] & [FLINK-25154]
> Proposal: 
> Take canonical savepoint/native savepoint/externalised checkpoint (with 
> RocksDB), and perform claim (1)/no claim (2) recoveries, and verify that in:
> # after a couple of checkpoints claimed files have been cleaned up
> # that after a single successful checkpoint, you can remove the start up 
> files and failover the job without any errors.
> # take a native, incremental RocksDB savepoint, move to a different 
> directory, restore from it
> documentation:
> # 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#restore-mode
> # 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#savepoint-format



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26273) Test checkpoints restore modes & formats

2022-02-24 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497438#comment-17497438
 ] 

Konstantin Knauf edited comment on FLINK-26273 at 2/24/22, 2:37 PM:


[~dwysakowicz] Looks very good. Here's what I did. Did I miss anything from 
your perspective? I opened two tickets (linked to this ticket) with CLI related 
issues. I also opened a hotfix PR with some documentation improvements: 
https://github.com/apache/flink/pull/18909

*Commit:* 0ff6f2cc78f

1. Taking Canonical/Native Savepoints/Retained Checkpoint

Ran TopSpeedWindowing in Standalone Application Mode with RocksDB (incremental) 
and checkpoints retained on cancellation three times: 
* stopped with native savepoint (Savepoint ID: savepoint-b6dea9-ec57dcec988e)
* stopped with canonical savepoint (Savepoint ID: savepoint-0d0bb8-3cfceefe4dec)
* cancelled (JobID: c40b0839cfa6a454919597819e8e84f6) 

*Checkpoint Directory*
{noformat}
/tmp/flink-checkpoints
├── 0d0bb8faccf2eb8124d086a5355428a8
│   ├── shared
│   └── taskowned
├── b6dea9642f5159f83c32eca3fc40082a
│   ├── shared
│   └── taskowned
└── c40b0839cfa6a454919597819e8e84f6
├── chk-13
│   └── _metadata
├── shared
│   └── 1d438c44-c7a6-49c0-8053-1e5689a6df5c
└── taskowned
{noformat}

*Savepoint Directory*


{noformat}
/tmp/flink-savepoints
├── savepoint-0d0bb8-3cfceefe4dec
│   └── _metadata
└── savepoint-b6dea9-ec57dcec988e
├── dd200786-54e3-4af3-a6f4-2943ff73bc14
└── _metadata
{noformat}

2. *Two Jobs can be Started from Native Savepoint without Claiming and take a 
full checkpoint*

Started 2 TopSpeedWindowing Jobs (aca6b1fc37c489d608b8ab9d562cd569 & 
634d99afcf280d7e6eefd7d9f2b0ec37) without claiming from Native Savepoint and 
confirmed that a full snapshot was taken for both of them (I took the fact that 
the "Checkpointed Data Size"="Full Checkpoint Data Size" for the first 
checkpoint only as sign that this is the case.). Cancelled both jobs. 

3. *Two Jobs can be Started from Retained Checkpoint without Claiming and take 
a full checkpoint*

Like Step 2a just using the retained checkpoint from Step 1 instead of native 
savepoint.

4. *Job can be claim retained checkpoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Started TopSpeedWindowing with Claiming from the Retained Checkpoint of Step 1. 
Confirmed that the first Checkpoint is incremental and confirmed that the 
original checkpoint directory is empty after a few checkpoints. 

{code:bash}
/tmp/flink-checkpoints/c40b0839cfa6a454919597819e8e84f6
├── shared
└── taskowned
{code}

4. *Job can be claim moved, native savepoint and continuous to checkpoint 
incrementally, retained checkpoint is cleaned up*

Copied Native Savepoint from Step 1 to a different directory. Everything else 
like in 3. The directory of the moved Savepoint does not exist after a few 
checkpoints and the first checkpoint is incremental.

5. *Native Savepoint can be removed after first successful checkpoint and 
recovery still works*

Started TopSpeedWindowing from Native Savepoint. After one checkpoint, removed 
savepoint, killed Taskmanager, restarted Taskmanager and Job recovered and 
continued checkpointing. 








was (Author: knaufk):
[~dwysakowicz] Looks very good. Here's what I did. Did I miss anything from 
your perspective? I opened two tickets (linked to this ticket) with CLI related 
issues. I also opened a hotfix PR with some documentation improvements: 
https://github.com/apache/flink/pull/18909

*Commit:* 0ff6f2cc78f

1. Taking Canonical/Native Savepoints/Retained Checkpoint

Ran TopSpeedWindowing in Standalone Application Mode with RocksDB (incremental) 
and checkpoints retained on cancellation three times: 
* stopped with native savepoint (Savepoint ID: savepoint-b6dea9-ec57dcec988e)
* stopped with canonical savepoint (Savepoint ID: savepoint-0d0bb8-3cfceefe4dec)
* cancelled (JobID: c40b0839cfa6a454919597819e8e84f6) 

*Checkpoint Directory*
{noformat}
/tmp/flink-checkpoints
├── 0d0bb8faccf2eb8124d086a5355428a8
│   ├── shared
│   └── taskowned
├── b6dea9642f5159f83c32eca3fc40082a
│   ├── shared
│   └── taskowned
└── c40b0839cfa6a454919597819e8e84f6
├── chk-13
│   └── _metadata
├── shared
│   └── 1d438c44-c7a6-49c0-8053-1e5689a6df5c
└── taskowned
{noformat}

*Savepoint Directory*


{noformat}
/tmp/flink-savepoints
├── savepoint-0d0bb8-3cfceefe4dec
│   └── _metadata
└── savepoint-b6dea9-ec57dcec988e
├── dd200786-54e3-4af3-a6f4-2943ff73bc14
└── _metadata
{noformat}

2. *Two Jobs can be Started from Native Savepoint without Claiming and take a 
full checkpoint*

Started 2 TopSpeedWindowing Jobs (aca6b1fc37c489d608b8ab9d562cd569 & 
634d99afcf280d7e6eefd7d9f2b0ec37) without claiming from Native Savepoint and 
confirmed that a full snapshot was taken for both of them (I took the fact that 
the "Checkpointed Data Size"="Full 

[jira] [Commented] (FLINK-26254) KafkaSink might violate order of sequence numbers and risk exactly-once processing

2022-02-24 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497409#comment-17497409
 ] 

Konstantin Knauf commented on FLINK-26254:
--

Important: this only happened with RedPanda not with Kafka.

> KafkaSink might violate order of sequence numbers and risk exactly-once 
> processing
> --
>
> Key: FLINK-26254
> URL: https://issues.apache.org/jira/browse/FLINK-26254
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.14.3
>Reporter: Fabian Paul
>Assignee: Alexander Preuss
>Priority: Critical
>
> When running the KafkaSink in exactly-once mode with a very low checkpoint 
> interval users are seeing `OutOfOrderSequenceException`.
> It could be caused by the fact that the connector has a pool of 
> KafkaProducers and the sequence numbers are not shared/reset if a new 
> KafkaProducer tries to write to a partition while the previous KafkaProducer 
> is still occupied for committing.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26354) "-restoreMode" should be "--restoreMode" and should have a shorthand

2022-02-24 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-26354:


 Summary: "-restoreMode" should be "--restoreMode" and should have 
a shorthand
 Key: FLINK-26354
 URL: https://issues.apache.org/jira/browse/FLINK-26354
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client
Affects Versions: 1.15.0
Reporter: Konstantin Knauf



{code:java}
-restoreMode  Defines how should we restore
from the given savepoint.
Supported options: [claim -
claim ownership of the savepoint
and delete once it is subsumed,
no_claim (default) - do not
claim ownership, the first
checkpoint will not reuse any
files from the restored one,
legacy - the old behaviour, do
not assume ownership of the
savepoint files, but can reuse
some shared files.
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26353) "flink stop --help" does not list "--type" option

2022-02-24 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-26353:


 Summary: "flink stop --help" does not list "--type" option 
 Key: FLINK-26353
 URL: https://issues.apache.org/jira/browse/FLINK-26353
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client
Affects Versions: 1.15.0
Reporter: Konstantin Knauf


{code:bash}
./bin/flink stop --help 

Action "stop" stops a running program with a savepoint (streaming jobs only).

  Syntax: stop [OPTIONS] 
  "stop" action options:
     -d,--drain                           Send MAX_WATERMARK before taking the
                                          savepoint and stopping the pipelne.
     -p,--savepointPath    Path to the savepoint (for example
                                          hdfs:///flink/savepoint-1537). If no
                                          directory is specified, the configured
                                          default will be used
                                          ("state.savepoints.dir").
  Options for Generic CLI mode:
     -D    Allows specifying multiple generic configuration
                           options. The available options can be found at
                           https://nightlies.apache.org/flink/flink-docs-stable/
                           ops/config.html
     -e,--executor    DEPRECATED: Please use the -t option instead which is
                           also available with the "Application Mode".
                           The name of the executor to be used for executing the
                           given job, which is equivalent to the
                           "execution.target" config option. The currently
                           available executors are: "remote", "local",
                           "kubernetes-session", "yarn-per-job" (deprecated),
                           "yarn-session".
     -t,--target      The deployment target for the given application,
                           which is equivalent to the "execution.target" config
                           option. For the "run" action the currently available
                           targets are: "remote", "local", "kubernetes-session",
                           "yarn-per-job" (deprecated), "yarn-session". For the
                           "run-application" action the currently available
                           targets are: "kubernetes-application".

  Options for yarn-cluster mode:
     -m,--jobmanager             Set to yarn-cluster to use YARN execution
                                      mode.
     -yid,--yarnapplicationId    Attach to running YARN session
     -z,--zookeeperNamespace     Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:
     -D              Allows specifying multiple generic
                                     configuration options. The available
                                     options can be found at
                                     https://nightlies.apache.org/flink/flink-do
                                     cs-stable/ops/config.html
     -m,--jobmanager            Address of the JobManager to which to
                                     connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration. Attention: This
                                     option is respected only if the
                                     high-availability configuration is NONE.
     -z,--zookeeperNamespace    Namespace to create the Zookeeper sub-paths
                                     for high availability mode
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-21884) Reduce TaskManager failure detection time

2022-02-22 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf closed FLINK-21884.

Resolution: Fixed

Solved in https://issues.apache.org/jira/browse/FLINK-25277. 

> Reduce TaskManager failure detection time
> -
>
> Key: FLINK-21884
> URL: https://issues.apache.org/jira/browse/FLINK-21884
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: reactive
> Fix For: 1.15.0
>
> Attachments: image-2021-03-19-20-10-40-324.png
>
>
> In Flink 1.13 (and older versions), TaskManager failures stall the processing 
> for a significant amount of time, even though the system gets indications for 
> the failure almost immediately through network connection losses.
> This is due to a high (default) heartbeat timeout of 50 seconds [1] to 
> accommodate for GC pauses, transient network disruptions or generally slow 
> environments (otherwise, we would unregister a healthy TaskManager).
> Such a high timeout can lead to disruptions in the processing (no processing 
> for certain periods, high latencies, buildup of consumer lag etc.). In 
> Reactive Mode (FLINK-10407), the issue surfaces on scale-down events, where 
> the loss of a TaskManager is immediately visible in the logs, but the job is 
> stuck in "FAILING" for quite a while until the TaskManger is really 
> deregistered. (Note that this issue is not that critical in a autoscaling 
> setup, because Flink can control the scale-down events and trigger them 
> proactively)
> On the attached metrics dashboard, one can see that the job has significant 
> throughput drops / consumer lags during scale down (and also CPU usage spikes 
> on processing the queued events, leading to incorrect scale up events again).
>  !image-2021-03-19-20-10-40-324.png|thumbnail!
> One idea to solve this problem is to:
> - Score TaskManagers based on certain signals (# exceptions reported, 
> exception types (connection losses, akka failures), failure frequencies,  
> ...) and blacklist them accordingly.
> - Introduce a best-effort TaskManager unregistration mechanism: When a 
> TaskManager receives a sigterm, it sends a final message to the JobManager 
> saying "goodbye", and the JobManager can immediately remove the TM from its 
> bookkeeping.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-22 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-24607:
-
Component/s: Connectors / Common

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common, Runtime / Coordination
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-22 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-24607:
-
Component/s: (was: Runtime / Coordination)

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-23843) Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill

2022-02-22 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-23843:
-
Component/s: Connectors / Common
 (was: Runtime / Coordination)

> Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should 
> cause Global Failure instead of Process Kill
> ---
>
> Key: FLINK-23843
> URL: https://issues.apache.org/jira/browse/FLINK-23843
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.13.2, 1.15.0
>Reporter: Stephan Ewen
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Currently, when a the method 
> "SplitEnumeratorContext.runInCoordinatorThread()" throws an exception, the 
> effect is a process kill of the JobManager process.
> The chain how the process kill happens is:
> * An exception bubbling up in the executor, killing the executor thread
> * The executor starts a replacement thread, which is forbidden by the thread 
> factory (as a safety net) and causes a process kill.
> We should prevent such exceptions from bubbling up in the coordinator 
> executor.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26281) Test Elasticsearch connector End2End

2022-02-22 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-26281:


Assignee: Konstantin Knauf

> Test Elasticsearch connector End2End
> 
>
> Key: FLINK-26281
> URL: https://issues.apache.org/jira/browse/FLINK-26281
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.15.0
>Reporter: Alexander Preuss
>Assignee: Konstantin Knauf
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> Feature introduced in https://issues.apache.org/jira/browse/FLINK-24323
> Documentation for [datastream 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/]
> Documentation for [table 
> api|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/elasticsearch/]
> As 1.15 deprecated the SinkFunction-based Elasticsearch connector and 
> introduces the new connector based on the Sink interface we should test it 
> behaves correctly and as the user expects.
>  
> Some suggestions what to test:
>  * Test delivery guarantees (none, at-least-once) (exactly-once should not 
> run)
>  * Write a simple job that is inserting/upserting data into Elasticsearch
>  * Write a simple job that is inserting/upserting data into Elasticsearch and 
> use a non-default parallelism
>  * Write a simple job in both datastream api and table api
>  * Test restarting jobs and scaling up/down
>  * Test failure of a simple job that is inserting data with exactly-once 
> delivery guarantee by terminating and restarting Elasticsearch
>  * Test against Elasticsearch 6.X and 7.X with the respective connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26273) Test checkpoints restore modes & formats

2022-02-22 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-26273:


Assignee: Konstantin Knauf

> Test checkpoints restore modes & formats
> 
>
> Key: FLINK-26273
> URL: https://issues.apache.org/jira/browse/FLINK-26273
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Dawid Wysakowicz
>Assignee: Konstantin Knauf
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.15.0
>
>
> We should test manually changes introduced in [FLINK-25276] & [FLINK-25154]
> Proposal: 
> Take canonical savepoint/native savepoint/externalised checkpoint (with 
> RocksDB), and perform claim (1)/no claim (2) recoveries, and verify that in:
> # after a couple of checkpoints claimed files have been cleaned up
> # that after a single successful checkpoint, you can remove the start up 
> files and failover the job without any errors.
> # take a native, incremental RocksDB savepoint, move to a different 
> directory, restore from it
> documentation:
> # 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#restore-mode
> # 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#savepoint-format



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26254) KafkaSink might violate order of sequence numbers and risk exactly-once processing

2022-02-18 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17494641#comment-17494641
 ] 

Konstantin Knauf commented on FLINK-26254:
--

This also happens with a higher checkpoint interval. It was easily reproducible 
with a 10s interval.

> KafkaSink might violate order of sequence numbers and risk exactly-once 
> processing
> --
>
> Key: FLINK-26254
> URL: https://issues.apache.org/jira/browse/FLINK-26254
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.14.3
>Reporter: Fabian Paul
>Priority: Critical
>
> When running the KafkaSink in exactly-once mode with a very low checkpoint 
> interval users are seeing `OutOfOrderSequenceException`.
> It could be caused by the fact that the connector has a pool of 
> KafkaProducers and the sequence numbers are not shared/reset if a new 
> KafkaProducer tries to write to a partition while the previous KafkaProducer 
> is still occupied for committing.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26018) Unnecessary late events when using the new KafkaSource

2022-02-17 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26018:
-
Fix Version/s: (was: 1.15.0)

> Unnecessary late events when using the new KafkaSource
> --
>
> Key: FLINK-26018
> URL: https://issues.apache.org/jira/browse/FLINK-26018
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.3
>Reporter: Jun Qin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.4
>
> Attachments: message in kafka.txt, 
> taskmanager_10.28.0.131_33249-b3370c_log
>
>
> There is an issue with the new KafkaSource connector in Flink 1.14: when one 
> task consumes messages from multiple topic partitions (statically created, 
> timestamp are in order), it may start with one partition and advances 
> watermarks before the data from other partitions come. In this case, the 
> early messages in other partitions may unnecessarily be considered  as late 
> ones.
> I discussed with [~renqs], it seems that the new KafkaSource only adds a 
> partition into {{WatermarkMultiplexer}} when it receives data from that 
> partition. In contrast, FlinkKafkaConsumer adds all known partition before it 
> fetch any data. 
> Attached two files: the messages in Kafka and the corresponding TM logs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26018) Unnecessary late events when using the new KafkaSource

2022-02-17 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26018:
-
Fix Version/s: 1.15.0

> Unnecessary late events when using the new KafkaSource
> --
>
> Key: FLINK-26018
> URL: https://issues.apache.org/jira/browse/FLINK-26018
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.3
>Reporter: Jun Qin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
> Attachments: message in kafka.txt, 
> taskmanager_10.28.0.131_33249-b3370c_log
>
>
> There is an issue with the new KafkaSource connector in Flink 1.14: when one 
> task consumes messages from multiple topic partitions (statically created, 
> timestamp are in order), it may start with one partition and advances 
> watermarks before the data from other partitions come. In this case, the 
> early messages in other partitions may unnecessarily be considered  as late 
> ones.
> I discussed with [~renqs], it seems that the new KafkaSource only adds a 
> partition into {{WatermarkMultiplexer}} when it receives data from that 
> partition. In contrast, FlinkKafkaConsumer adds all known partition before it 
> fetch any data. 
> Attached two files: the messages in Kafka and the corresponding TM logs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26018) Unnecessary late events when using the new KafkaSource

2022-02-17 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26018:
-
Fix Version/s: 1.15.0
   1.14.4

> Unnecessary late events when using the new KafkaSource
> --
>
> Key: FLINK-26018
> URL: https://issues.apache.org/jira/browse/FLINK-26018
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.3
>Reporter: Jun Qin
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
> Attachments: message in kafka.txt, 
> taskmanager_10.28.0.131_33249-b3370c_log
>
>
> There is an issue with the new KafkaSource connector in Flink 1.14: when one 
> task consumes messages from multiple topic partitions (statically created, 
> timestamp are in order), it may start with one partition and advances 
> watermarks before the data from other partitions come. In this case, the 
> early messages in other partitions may unnecessarily be considered  as late 
> ones.
> I discussed with [~renqs], it seems that the new KafkaSource only adds a 
> partition into {{WatermarkMultiplexer}} when it receives data from that 
> partition. In contrast, FlinkKafkaConsumer adds all known partition before it 
> fetch any data. 
> Attached two files: the messages in Kafka and the corresponding TM logs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25883) The value of DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S is too large

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25883:
-
Fix Version/s: 1.13.7
   (was: 1.13.6)

> The value of DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S is too large 
> --
>
> Key: FLINK-25883
> URL: https://issues.apache.org/jira/browse/FLINK-25883
> Project: Flink
>  Issue Type: Bug
> Environment: Windows, Python 3.8
>Reporter: Mikhail
>Assignee: Dian Fu
>Priority: Minor
> Fix For: 1.15.0, 1.12.8, 1.14.4, 1.13.7
>
>
> In [this 
> line|https://github.com/apache/flink/blob/fb38c99a38c63ba8801e765887f955522072615a/flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py#L30],
>  the value of DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S is set to 
> 315360. This is more than the default value of threading.TIMEOUT_MAX on 
> Windows Python, which is 4294967. Due to this, "OverflowError: timeout value 
> is too large" error is produced.
> Full traceback:
> {code:java}
>  File 
> "G:\PycharmProjects\PyFlink\venv_from_scratch\lib\site-packages\apache_beam\runners\worker\data_plane.py",
>  line 218, in run
>   while not self._finished.wait(next_call - time.time()):
>  File "C:\Python38\lib\threading.py", line 558, in wait
>   signaled = self._cond.wait(timeout)
>  File "C:\Python38\lib\threading.py", line 306, in wait
>   gotit = waiter.acquire(True, timeout)
> OverflowError: timeout value is too large{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25785) Update com.h2database:h2 to 2.0.210

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25785:
-
Fix Version/s: 1.13.7
   (was: 1.13.6)

> Update com.h2database:h2 to 2.0.210
> ---
>
> Key: FLINK-25785
> URL: https://issues.apache.org/jira/browse/FLINK-25785
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Affects Versions: 1.15.0, 1.13.5, 1.14.3
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
>
> Two security vulnerabilities in H2 Console (CVE-2022-23221 and possible DNS 
> rebinding attack) are fixed in 2.0.120. Flink is currently on 2.0.206 since 
> https://issues.apache.org/jira/browse/FLINK-25576
> Note: Flink is using this dependency only for testing, so it's not directly 
> impacted by the CVE. We just want to be good citizens and update our 
> dependencies



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26039) Incorrect value getter in map unnest table function

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-26039:
-
Fix Version/s: 1.13.7
   (was: 1.13.6)

> Incorrect value getter in map unnest table function
> ---
>
> Key: FLINK-26039
> URL: https://issues.apache.org/jira/browse/FLINK-26039
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Han
>Assignee: Han
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
>
> Suppose we have a map field that needs to be expanded.
>  
> {code:java}
> CREATE TABLE t (
>     id INT,
>     map_field MAP
> ) WITH (
>     -- ...
> );
> SELECT id, k, v FROM t, unnest(map_field) as A(k, v);{code}
>  
>  
> We will get the following runtime exception:
> {code:java}
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.table.data.binary.BinaryStringData cannot be cast to 
> java.lang.Integer
>     at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
>     at 
> org.apache.flink.table.data.utils.JoinedRowData.getInt(JoinedRowData.java:149)
>     at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
>     at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at 
> org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
>     at 
> org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
>     at StreamExecCorrelate$10$TableFunctionCollector$4.collect(Unknown Source)
>     at 
> org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
>     at 
> StreamExecCorrelate$10$TableFunctionResultConverterCollector$8.collect(Unknown
>  Source)
>     at 
> org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:197)
>     at 
> org.apache.flink.table.runtime.functions.SqlUnnestUtils$MapUnnestTableFunction.eval(SqlUnnestUtils.java:169)
>     at StreamExecCorrelate$10.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-24607:
-
Fix Version/s: 1.13.7
   (was: 1.13.6)

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25771) CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25771:
-
Fix Version/s: 1.13.7
   (was: 1.13.6)

> CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP
> -
>
> Key: FLINK-25771
> URL: https://issues.apache.org/jira/browse/FLINK-25771
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.15.0, 1.13.5, 1.14.3
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
>
> The test {{CassandraConnectorITCase.testRetrialAndDropTables}} fails on AZP 
> with
> {code}
> Jan 23 01:02:52 com.datastax.driver.core.exceptions.NoHostAvailableException: 
> All host(s) tried for query failed (tried: /172.17.0.1:59220 
> (com.datastax.driver.core.exceptions.OperationTimedOutException: 
> [/172.17.0.1] Timed out waiting for server response))
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
> Jan 23 01:02:52   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> Jan 23 01:02:52   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRetrialAndDropTables(CassandraConnectorITCase.java:554)
> Jan 23 01:02:52   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jan 23 01:02:52   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jan 23 01:02:52   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jan 23 01:02:52   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jan 23 01:02:52   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:196)
> Jan 23 01:02:52   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jan 23 01:02:52   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jan 23 01:02:52   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jan 23 01:02:52   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jan 23 01:02:52   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
> Jan 23 01:02:52   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jan 23 01:02:52   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> Jan 23 01:02:52   at org.junit.runners.Suite.runChild(Suite.java:128)
> Jan 23 01:02:52   at 

[jira] [Resolved] (FLINK-25532) Provide Flink SQL CLI as Docker image

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf resolved FLINK-25532.
--
Resolution: Fixed

Fixed via Documentation.

> Provide Flink SQL CLI as Docker image
> -
>
> Key: FLINK-25532
> URL: https://issues.apache.org/jira/browse/FLINK-25532
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Martijn Visser
>Assignee: Konstantin Knauf
>Priority: Major
>  Labels: pull-request-available
>
> Flink is currently available via as Docker images. However, the Flink SQL CLI 
> isn't available as a Docker image. We should also provide this. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25532) Provide Flink SQL CLI as Docker image

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-25532:
-
Fix Version/s: 1.15.0

> Provide Flink SQL CLI as Docker image
> -
>
> Key: FLINK-25532
> URL: https://issues.apache.org/jira/browse/FLINK-25532
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Martijn Visser
>Assignee: Konstantin Knauf
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Flink is currently available via as Docker images. However, the Flink SQL CLI 
> isn't available as a Docker image. We should also provide this. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-25532) Provide Flink SQL CLI as Docker image

2022-02-16 Thread Konstantin Knauf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf reassigned FLINK-25532:


Assignee: Konstantin Knauf

> Provide Flink SQL CLI as Docker image
> -
>
> Key: FLINK-25532
> URL: https://issues.apache.org/jira/browse/FLINK-25532
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Martijn Visser
>Assignee: Konstantin Knauf
>Priority: Major
>  Labels: pull-request-available
>
> Flink is currently available via as Docker images. However, the Flink SQL CLI 
> isn't available as a Docker image. We should also provide this. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25532) Provide Flink SQL CLI as Docker image

2022-02-11 Thread Konstantin Knauf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490816#comment-17490816
 ] 

Konstantin Knauf commented on FLINK-25532:
--

I overlooked the passthrough command in entrypoint. Hence, I think, it is ok as 
is. I will create a PR to improve the documentation on this, though.

> Provide Flink SQL CLI as Docker image
> -
>
> Key: FLINK-25532
> URL: https://issues.apache.org/jira/browse/FLINK-25532
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Martijn Visser
>Priority: Major
>
> Flink is currently available via as Docker images. However, the Flink SQL CLI 
> isn't available as a Docker image. We should also provide this. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   3   4   5   6   7   8   9   10   >