[jira] [Commented] (FLINK-16686) [State TTL] Make user class loader available in native RocksDB compaction thread

2024-05-17 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847467#comment-17847467
 ] 

Thomas Weise commented on FLINK-16686:
--

Flink 1.17:
{code:java}
Exception in thread "Thread-14" java.lang.IllegalArgumentException: classLoader 
cannot be null.
at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:550)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:391)
at 
org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
 {code}

> [State TTL] Make user class loader available in native RocksDB compaction 
> thread
> 
>
> Key: FLINK-16686
> URL: https://issues.apache.org/jira/browse/FLINK-16686
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.11.3, 1.13.0, 1.12.3, 1.17.0
>Reporter: Andrey Zagrebin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The issue is initially reported 
> [here|https://stackoverflow.com/questions/60745711/flink-kryo-serializer-because-chill-serializer-couldnt-be-found].
> The problem is that the java code of Flink compaction filter is called from 
> RocksDB native C++ code. It is called in the context of the native compaction 
> thread. RocksDB has utilities to create java Thread context for the Flink 
> java callback. Presumably, the Java thread context class loader is not set at 
> all and if it is queried then it produces NullPointerException.
> The provided report enabled a list state with TTL. The compaction filter has 
> to deserialise elements to check expiration. The deserialiser relies on Kryo 
> which queries the thread context class loader which is expected to be the 
> user class loader of the task but turns out to be null.
> We should investigate how to pass the user class loader to the compaction 
> thread of the list state with TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-16686) [State TTL] Make user class loader available in native RocksDB compaction thread

2024-05-17 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-16686:
-
Affects Version/s: 1.17.0

> [State TTL] Make user class loader available in native RocksDB compaction 
> thread
> 
>
> Key: FLINK-16686
> URL: https://issues.apache.org/jira/browse/FLINK-16686
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0, 1.11.3, 1.13.0, 1.12.3, 1.17.0
>Reporter: Andrey Zagrebin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The issue is initially reported 
> [here|https://stackoverflow.com/questions/60745711/flink-kryo-serializer-because-chill-serializer-couldnt-be-found].
> The problem is that the java code of Flink compaction filter is called from 
> RocksDB native C++ code. It is called in the context of the native compaction 
> thread. RocksDB has utilities to create java Thread context for the Flink 
> java callback. Presumably, the Java thread context class loader is not set at 
> all and if it is queried then it produces NullPointerException.
> The provided report enabled a list state with TTL. The compaction filter has 
> to deserialise elements to check expiration. The deserialiser relies on Kryo 
> which queries the thread context class loader which is expected to be the 
> user class loader of the task but turns out to be null.
> We should investigate how to pass the user class loader to the compaction 
> thread of the list state with TTL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28048) Introduce Source API alternative to FiniteTestSource

2024-04-20 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-28048.
--
Fix Version/s: 1.20.0
   Resolution: Implemented

> Introduce Source API alternative to FiniteTestSource
> 
>
> Key: FLINK-28048
> URL: https://issues.apache.org/jira/browse/FLINK-28048
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Tests
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This also has to verify that Iceberg connector tests mentioned in FLINK-28054 
> also get covered by the solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-25565) Write and Read Parquet INT64 Timestamp

2023-12-08 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-25565:
-
Fix Version/s: 1.18.1

> Write and Read Parquet INT64 Timestamp
> --
>
> Key: FLINK-25565
> URL: https://issues.apache.org/jira/browse/FLINK-25565
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.12.0, 1.15.0
>Reporter: Bo Cui
>Assignee: Bo Cui
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0, 1.18.1
>
>
> Flink cannot read parquet files that contain INT64 Timestamp generated by 
> Spark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25565) Write and Read Parquet INT64 Timestamp

2023-12-07 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794287#comment-17794287
 ] 

Thomas Weise commented on FLINK-25565:
--

[~martijnvisser] the default behavior did not change. It is necessary to opt 
into the INT64 mapping, the PR added the config option to the same page 
(write.int64.timestamp). I agree it would be helpful if the mapping table 
itself mentions it also though. 

> Write and Read Parquet INT64 Timestamp
> --
>
> Key: FLINK-25565
> URL: https://issues.apache.org/jira/browse/FLINK-25565
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.12.0, 1.15.0
>Reporter: Bo Cui
>Assignee: Bo Cui
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> Flink cannot read parquet files that contain INT64 Timestamp generated by 
> Spark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25565) Write and Read Parquet INT64 Timestamp

2023-12-07 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794237#comment-17794237
 ] 

Thomas Weise commented on FLINK-25565:
--

It turns out that without this option, tables with timestamp columns written by 
Spark and probably other tools cannot be consumed. Thanks [~Bo Cui] and sorry 
for the long wait.

> Write and Read Parquet INT64 Timestamp
> --
>
> Key: FLINK-25565
> URL: https://issues.apache.org/jira/browse/FLINK-25565
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.12.0, 1.15.0
>Reporter: Bo Cui
>Assignee: Bo Cui
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> Flink cannot read parquet files that contain INT64 Timestamp generated by 
> Spark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-25565) Write and Read Parquet INT64 Timestamp

2023-12-06 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-25565.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

> Write and Read Parquet INT64 Timestamp
> --
>
> Key: FLINK-25565
> URL: https://issues.apache.org/jira/browse/FLINK-25565
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.12.0, 1.15.0
>Reporter: Bo Cui
>Assignee: Bo Cui
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> Flink cannot read parquet files that contain INT64 Timestamp generated by 
> Spark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-27529) HybridSourceSplitEnumerator sourceIndex using error Integer check

2023-12-04 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-27529.
--
Resolution: Fixed

> HybridSourceSplitEnumerator sourceIndex using error Integer check
> -
>
> Key: FLINK-27529
> URL: https://issues.apache.org/jira/browse/FLINK-27529
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.14.4, 1.15.0, 1.15.1
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer 
> type but == operator. 
> As hybrid source definition, it can concat with more than 2 child sources. so 
> currently works just because Integer cache(only works <=127), if we have more 
> sources will fail on error. In a word, we can't use == to compare Integer 
> index unless we limit hybrid sources only works <=127.
> e.g.
> {code:java}
> Integer i1 = 128;
> Integer i2 = 128;
> System.out.println(i1 == i2);
> int i3 = 128;
> int i4 = 128;
> System.out.println((Integer) i3 == (Integer) i4);
> {code}
> It will show false, false.
> HybridSource Integer index comparison is below:
> {code:java}
> @Override
> public Map registeredReaders() {
> 
> Integer lastIndex = null;
> for (Integer sourceIndex : readerSourceIndex.values()) {
> if (lastIndex != null && lastIndex != sourceIndex) {
> return filterRegisteredReaders(readers);
> }
> lastIndex = sourceIndex;
> }
> return readers;
> }
> private Map filterRegisteredReaders(Map ReaderInfo> readers) {
> Map readersForSource = new 
> HashMap<>(readers.size());
> for (Map.Entry e : readers.entrySet()) {
> if (readerSourceIndex.get(e.getKey()) == (Integer) 
> sourceIndex) {
> readersForSource.put(e.getKey(), e.getValue());
> }
> }
> return readersForSource;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27529) HybridSourceSplitEnumerator sourceIndex using error Integer check

2023-12-04 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27529:
-
Fix Version/s: 1.19.0

> HybridSourceSplitEnumerator sourceIndex using error Integer check
> -
>
> Key: FLINK-27529
> URL: https://issues.apache.org/jira/browse/FLINK-27529
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.14.4, 1.15.0, 1.15.1
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer 
> type but == operator. 
> As hybrid source definition, it can concat with more than 2 child sources. so 
> currently works just because Integer cache(only works <=127), if we have more 
> sources will fail on error. In a word, we can't use == to compare Integer 
> index unless we limit hybrid sources only works <=127.
> e.g.
> {code:java}
> Integer i1 = 128;
> Integer i2 = 128;
> System.out.println(i1 == i2);
> int i3 = 128;
> int i4 = 128;
> System.out.println((Integer) i3 == (Integer) i4);
> {code}
> It will show false, false.
> HybridSource Integer index comparison is below:
> {code:java}
> @Override
> public Map registeredReaders() {
> 
> Integer lastIndex = null;
> for (Integer sourceIndex : readerSourceIndex.values()) {
> if (lastIndex != null && lastIndex != sourceIndex) {
> return filterRegisteredReaders(readers);
> }
> lastIndex = sourceIndex;
> }
> return readers;
> }
> private Map filterRegisteredReaders(Map ReaderInfo> readers) {
> Map readersForSource = new 
> HashMap<>(readers.size());
> for (Map.Entry e : readers.entrySet()) {
> if (readerSourceIndex.get(e.getKey()) == (Integer) 
> sourceIndex) {
> readersForSource.put(e.getKey(), e.getValue());
> }
> }
> return readersForSource;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss

2023-11-13 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785549#comment-17785549
 ] 

Thomas Weise commented on FLINK-33402:
--

[~varun1729dd] thanks for investigating this. It would be helpful to understand 
better why we see the race condition as under the mailbox model this should not 
happen. Let's continue the discussion on the PR.

> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> 
>
> Key: FLINK-33402
> URL: https://issues.apache.org/jira/browse/FLINK-33402
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>Reporter: Varun Narayanan Chakravarthy
>Assignee: Varun Narayanan Chakravarthy
>Priority: Critical
>  Labels: pull-request-available
> Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=000229 position=null] 2023-10-10 
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO 
>  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek 
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher 
> for Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished 
> reading from splits [000154] 2023-10-10 17:46:24.014 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source 
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader  
> - No more splits for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for 
> Source: parquet-source (1/2)#0|#0] INFO  
> org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] 
> INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to 

[jira] [Assigned] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss

2023-11-13 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-33402:


Assignee: Varun Narayanan Chakravarthy

> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> 
>
> Key: FLINK-33402
> URL: https://issues.apache.org/jira/browse/FLINK-33402
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>Reporter: Varun Narayanan Chakravarthy
>Assignee: Varun Narayanan Chakravarthy
>Priority: Critical
>  Labels: pull-request-available
> Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=000229 position=null] 2023-10-10 
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO 
>  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek 
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher 
> for Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished 
> reading from splits [000154] 2023-10-10 17:46:24.014 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source 
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader  
> - No more splits for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for 
> Source: parquet-source (1/2)#0|#0] INFO  
> org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] 
> INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=000229 
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: 

[jira] [Resolved] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-09-26 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-32884.
--
Resolution: Fixed

> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, 
> but it does not support the placement of the JobManager behind a proxy or 
> using an Ingress for routing to a specific Flink cluster based on the URL 
> path. In the current scenario, it expects JobManager to access PyFlink jobs 
> at `http://:/v1/jobs` endpoint. Mapping to a non-root 
> location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is a URL with a path (also support for https 
> scheme).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-09-21 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise closed FLINK-32884.

Resolution: Fixed

> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081/`, 
> but it does not support the placement of the JobManager behind a proxy or 
> using an Ingress for routing to a specific Flink cluster based on the URL 
> path. In the current scenario, it expects JobManager to access PyFlink jobs 
> at `http://:/v1/jobs` endpoint. Mapping to a non-root 
> location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)(https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is a URL with a path (also support for https 
> scheme).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution

2023-08-16 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-32885:


Assignee: Elkhan Dadashov

> Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used 
> by RestClusterClient for PyFlink remote execution
> -
>
> Key: FLINK-32885
> URL: https://issues.apache.org/jira/browse/FLINK-32885
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Table SQL / Gateway
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>
> UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
> dependency on `flink-clients` module. RestClusterClient will also need to use 
> UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
> related classes to achieve this.
> I intend to change these classes in a backward compatible way
> flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
> flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java
> flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java
> flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-08-16 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-32884:


Assignee: Elkhan Dadashov

> PyFlink remote execution should support URLs with paths and https scheme
> 
>
> Key: FLINK-32884
> URL: https://issues.apache.org/jira/browse/FLINK-32884
> Project: Flink
>  Issue Type: New Feature
>  Components: Client / Job Submission, Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Elkhan Dadashov
>Assignee: Elkhan Dadashov
>Priority: Major
>
> Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
> local execution it works fine `SUBMIT_ARGS=remote -m 
> [http://localhost:8081|http://localhost:8081/]`, but it does not support the 
> placement of the JobManager befind a proxy or using an Ingress for routing to 
> a specific Flink cluster based on the URL path. In current scenario, it 
> expects JobManager access PyFlink jobs at `http://:/v1/jobs` 
> endpoint. Mapping to a non-root location, 
> `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
>  is not supported.
> This will use changes from 
> [FLINK-32885](https://issues.apache.org/jira/browse/FLINK-32885)
> Since RestClusterClient talks to the JobManager via its REST endpoint, the 
> right format for `SUBMIT_ARGS` is URL with path (also support for https 
> scheme).
> I intend to change these classes in a backward compatible way
> flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
> flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
> flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
> flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
> flink-core/src/main/java/org/apache/flink/util/NetUtils.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32035) SQL Client should support HTTPS with built-in JDK certificates

2023-07-07 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-32035.
--
Fix Version/s: 1.18.0
   Resolution: Implemented

> SQL Client should support HTTPS with built-in JDK certificates
> --
>
> Key: FLINK-32035
> URL: https://issues.apache.org/jira/browse/FLINK-32035
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client, Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Related to FLINK-32030 
> Internally SQL Client uses  Flink’s _RestClient_ [1].  This client decides 
> whether to enable SSL not on the basis of the URL schema 
> ([https://|https:]...), but based on Flink configuration, namely a global 
> _security.ssl.rest.enabled_  parameter [2] (which is also used for the REST 
> server-side configuration ). When this parameter is set to true, it 
> automatically requires user-supplied  _security.ssl.rest.truststore_  and 
> _security.ssl.rest.keystore_ to be configured - there is no default option to 
> use certificates from JDK. After URL support for SQL Client gateway mode 
> (FLINK-32030) gets added, the SQL Client should automatically use 
> certificates built in into the JDK unless user-supplied trust- and keystores 
> are configured. 
> [1] 
> [https://github.com/apache/flink/blob/5dddc0dba2be20806e67769314eecadf56b87a53/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java#L359]
> [2] 
> [https://github.com/apache/flink/blob/5d9e63a16f079399c6b51547284bb96db0326bdb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java#L103]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32373) Support passing headers with SQL Client gateway requests

2023-07-06 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-32373.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

> Support passing headers with SQL Client gateway requests
> 
>
> Key: FLINK-32373
> URL: https://issues.apache.org/jira/browse/FLINK-32373
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client, Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> FLINK-32030 and FLINK-32035 enable communication from the SQL Client to the 
> SQL Gateway placed behind a proxy, such as a K8S ingress. Given that 
> authentication is typically needed in these cases, it can be achieved by 
> adding the ability to supply custom headers to the underlying RestClient.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32137) Flame graph is hard to use with many task managers

2023-06-16 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-32137:


Assignee: Vladimir Matveev

> Flame graph is hard to use with many task managers
> --
>
> Key: FLINK-32137
> URL: https://issues.apache.org/jira/browse/FLINK-32137
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.1
>Reporter: Vladimir Matveev
>Assignee: Vladimir Matveev
>Priority: Major
> Attachments: image (1).png, image-2023-05-23-11-01-30-391.png
>
>
> In case there are many task managers executing the same operator, the flame 
> graph becomes very hard to use. As you can see on the attached picture, it 
> considers instances of the same lambda function as different classes, and 
> their number seems to be equal to the number of task managers (i.e. each JVM 
> gets its own "class" name, which is expected for lambdas I guess). This 
> lambda function is deep within Flink's own call stack, so this kind of graph 
> is inevitable regardless of the job's own logic, and there is nothing we can 
> do at the job logic's level to fix it.
> This behavior makes evaluating the flame graph very hard, because all of the 
> useful information gets "compressed" inside each "column" of the graph, and 
> at the same time, it does not give any useful information since this is just 
> an artifact of the class name generation in the JVM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32109) Operator doesn't recognize JobManager stuck on volumeMount startup errors

2023-05-16 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17723208#comment-17723208
 ] 

Thomas Weise commented on FLINK-32109:
--

[~gyfora] if the issue can be corrected externally and eventually the 
deployment can transition into running state w/o intelligence in the operator 
then it is probably best to just wait? What would be useful though is to bubble 
up the event to the flinkdeployment level. Similar to genuine errors that we 
already interpret this would require special logic to recognize the specific 
condition. That needs to be added on a best effort basis and I think that is OK 
since it is mostly for convenience (saving the client to dig into the lower 
level resources).

> Operator doesn't recognize JobManager stuck on volumeMount startup errors
> -
>
> Key: FLINK-32109
> URL: https://issues.apache.org/jira/browse/FLINK-32109
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.6.0
>Reporter: Gyula Fora
>Priority: Major
>
> Currently the flink deployment observer logic only reacts to Deployment 
> conditions such as failure to create the JM pod, image pull errors etc.
> Pod startup errors such as volumeMount are not recognized as errors and the 
> operator keeps waiting for it indefintitely.
> This is a tricky problem because volumeMount errors can be transient and are 
> only reported as Events for the pod so I am not completely sure if we can do 
> anything about this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32030) SQL Client gateway mode should accept URLs

2023-05-16 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-32030:
-
Fix Version/s: 1.18.0

> SQL Client gateway mode should accept URLs
> --
>
> Key: FLINK-32030
> URL: https://issues.apache.org/jira/browse/FLINK-32030
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client, Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently, the _--endpoint_ parameter has to be specified in the 
> _InetSocketAddress_  format, i.e. _hostname:port._ While this works fine for 
> basic use cases, it does not support the placement of the gateway behind a 
> proxy or using an Ingress for routing to a specific Flink cluster based on 
> the URL path.  I.e. it expects 
> _[some.hostname.com:9001|http://some.hostname.com:9001/]_  to directly serve 
> requests on _[some.hostname.com:9001/v1|http://some.hostname.com:9001/v1]_ . 
> Mapping to a non-root location, i.e. 
> _[some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1|http://some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1]_
>   is not supported.
>  
> Since the client talks to the gateway via its REST endpoint, the right format 
> for the _--endpoint_  parameter is {_}URL{_}, not _InetSocketAddress_ .  
> The same _--endpoint_ parameter can be reused if the changes are implemented 
> in a backwards-compatible way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32030) SQL Client gateway mode should accept URLs

2023-05-16 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-32030.
--
Resolution: Fixed

> SQL Client gateway mode should accept URLs
> --
>
> Key: FLINK-32030
> URL: https://issues.apache.org/jira/browse/FLINK-32030
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client, Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the _--endpoint_ parameter has to be specified in the 
> _InetSocketAddress_  format, i.e. _hostname:port._ While this works fine for 
> basic use cases, it does not support the placement of the gateway behind a 
> proxy or using an Ingress for routing to a specific Flink cluster based on 
> the URL path.  I.e. it expects 
> _[some.hostname.com:9001|http://some.hostname.com:9001/]_  to directly serve 
> requests on _[some.hostname.com:9001/v1|http://some.hostname.com:9001/v1]_ . 
> Mapping to a non-root location, i.e. 
> _[some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1|http://some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1]_
>   is not supported.
>  
> Since the client talks to the gateway via its REST endpoint, the right format 
> for the _--endpoint_  parameter is {_}URL{_}, not _InetSocketAddress_ .  
> The same _--endpoint_ parameter can be reused if the changes are implemented 
> in a backwards-compatible way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31997) Update to Fabric8 6.5.1+ in flink-kubernetes

2023-05-07 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720364#comment-17720364
 ] 

Thomas Weise commented on FLINK-31997:
--

[~gyfora] would it be possible to also shade the fabric8 dependency that goes 
into flink-kubernetes.jar? Otherwise there is the possibility of running into 
class path conflicts when the application also has a fabric8 dependency as I 
have seen in at least one case.

> Update to Fabric8 6.5.1+ in flink-kubernetes
> 
>
> Key: FLINK-31997
> URL: https://issues.apache.org/jira/browse/FLINK-31997
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>
> We should update the fabric8 version in flink-kubernetes to at least 6.5.1. 
> Flink currently uses a very old fabric8 version. The fabric8 library 
> dependencies have since been revised and greately improved to make them more 
> moduler and allow eliminating securitiy vulnerabilities more easily like: 
> https://issues.apache.org/jira/browse/FLINK-31815
> The newer versions especially 6.5.1 + also add some improvement stability 
> fixes for watches and other parts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31974) JobManager crashes after KubernetesClientException exception with FatalExitExceptionHandler

2023-05-04 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719355#comment-17719355
 ] 

Thomas Weise commented on FLINK-31974:
--

There are many cases where errors are transient. This specific case is actually 
quite obvious, the resource availability on a large cluster is changing 
constantly. A pod may not be scheduled now but few seconds later. Other k8s 
related issues can also be transient, for example a failed request due to rate 
limiting will likely succeed soon after and we would actually make things worse 
by not following a backoff/retry strategy and simply letting the job fail. I'm 
also leaning more towards retry by default strategy and identify the cases that 
should be fatal error.

> JobManager crashes after KubernetesClientException exception with 
> FatalExitExceptionHandler
> ---
>
> Key: FLINK-31974
> URL: https://issues.apache.org/jira/browse/FLINK-31974
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.17.0
>Reporter: Sergio Sainz
>Assignee: Weijie Guo
>Priority: Major
>
> When resource quota limit is reached JobManager will throw
>  
> org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException:
>  Failure executing: POST at: 
> https://10.96.0.1/api/v1/namespaces/my-namespace/pods. Message: 
> Forbidden!Configured service account doesn't have access. Service account may 
> have been revoked. pods "my-namespace-flink-cluster-taskmanager-1-2" is 
> forbidden: exceeded quota: my-namespace-resource-quota, requested: 
> limits.cpu=3, used: limits.cpu=12100m, limited: limits.cpu=13.
>  
> In {*}1.16.1 , this is handled gracefully{*}:
> {code}
> 2023-04-28 22:07:24,631 WARN  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Failed requesting worker with resource spec WorkerResourceSpec 
> \{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb 
> (241591914 bytes), numSlots=4}, current pending count: 0
> java.util.concurrent.CompletionException: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> POST at: https://10.96.0.1/api/v1/namespaces/my-namespace/pods. Message: 
> Forbidden!Configured service account doesn't have access. Service account may 
> have been revoked. pods "my-namespace-flink-cluster-taskmanager-1-138" is 
> forbidden: exceeded quota: my-namespace-resource-quota, requested: 
> limits.cpu=3, used: limits.cpu=12100m, limited: limits.cpu=13.
>         at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) ~[?:?]
>         at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) ~[?:?]
>         at java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown 
> Source) ~[?:?]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>         at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
> executing: POST at: https://10.96.0.1/api/v1/namespaces/my-namespace/pods. 
> Message: Forbidden!Configured service account doesn't have access. Service 
> account may have been revoked. pods 
> "my-namespace-flink-cluster-taskmanager-1-138" is forbidden: exceeded quota: 
> my-namespace-resource-quota, requested: limits.cpu=3, used: 
> limits.cpu=12100m, limited: limits.cpu=13.
>         at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:684)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:664)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:613)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:558)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:521)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:308)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:644)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:83)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>         

[jira] [Commented] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-03-23 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17704157#comment-17704157
 ] 

Thomas Weise commented on FLINK-30859:
--

The examples would lead to a chicken and egg problem: We would first need to 
release core, then the connector based on that core release, then the examples 
based on the connector release. That seems to suggest that the Kafka example 
should also live in the Kafka connector repo?

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Priority: Major
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31305) KafkaWriter doesn't wait for errors for in-flight records before completing flush

2023-03-02 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-31305:
-
Issue Type: Bug  (was: Improvement)

> KafkaWriter doesn't wait for errors for in-flight records before completing 
> flush
> -
>
> Key: FLINK-31305
> URL: https://issues.apache.org/jira/browse/FLINK-31305
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.17.0
>
>
> The KafkaWriter flushing needs to wait for all in-flight records to send 
> successfully. This can be achieved by tracking requests and returning a 
> response from the registered callback from the producer#send() logic.
> There is potential for data loss since the checkpoint does not accurately 
> reflect that all records have been sent successfully, to preserve at least 
> once semantics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31305) KafkaWriter doesn't wait for errors for in-flight records before completing flush

2023-03-02 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-31305:


Assignee: Mason Chen

> KafkaWriter doesn't wait for errors for in-flight records before completing 
> flush
> -
>
> Key: FLINK-31305
> URL: https://issues.apache.org/jira/browse/FLINK-31305
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.17.0
>
>
> The KafkaWriter flushing needs to wait for all in-flight records to send 
> successfully. This can be achieved by tracking requests and returning a 
> response from the registered callback from the producer#send() logic.
> There is potential for data loss since the checkpoint does not accurately 
> reflect that all records have been sent successfully, to preserve at least 
> once semantics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22793) HybridSource Table Implementation

2023-02-24 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693339#comment-17693339
 ] 

Thomas Weise commented on FLINK-22793:
--

[~nicholasjiang] thanks for the confirmation!

> HybridSource Table Implementation
> -
>
> Key: FLINK-22793
> URL: https://issues.apache.org/jira/browse/FLINK-22793
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Nicholas Jiang
>Assignee: Ran Tao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-22793) HybridSource Table Implementation

2023-02-24 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-22793:


Assignee: Ran Tao  (was: Nicholas Jiang)

> HybridSource Table Implementation
> -
>
> Key: FLINK-22793
> URL: https://issues.apache.org/jira/browse/FLINK-22793
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Nicholas Jiang
>Assignee: Ran Tao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22793) HybridSource Table Implementation

2023-02-09 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686606#comment-17686606
 ] 

Thomas Weise commented on FLINK-22793:
--

[~lemonjing] given the long time that has passed and all the FLIP work you have 
been driving, I think it is time to reassign this ticket to you. Let's wait 
couple more days and see if [~nicholasjiang] can confirm.

> HybridSource Table Implementation
> -
>
> Key: FLINK-22793
> URL: https://issues.apache.org/jira/browse/FLINK-22793
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Nicholas Jiang
>Assignee: Nicholas Jiang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30858) Kubernetes operator does not update reconciled generation

2023-01-31 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682798#comment-17682798
 ] 

Thomas Weise commented on FLINK-30858:
--

https://lists.apache.org/thread/8y1zp4ogssy8ltsl42ppzvbo64dlzc3v

> Kubernetes operator does not update reconciled generation
> -
>
> Key: FLINK-30858
> URL: https://issues.apache.org/jira/browse/FLINK-30858
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
>Reporter: Thomas Weise
>Priority: Major
>
> Kubernetes manages the generation field as part of the spec metadata. It will 
> be increased when changes are made to the resource. The counterpart in status 
> is "observed generation", provided by a controller. By comparing the two, the 
> client can determine that the controller has processed the spec and in 
> conjunction with other status information conclude that a change has been 
> reconciled.
> The Flink operator currently tracks the generation as part of reconciled and 
> stable specs but these cannot be used as "observed generation" to perform the 
> check. The value isn't updated in cases where operator determines that there 
> are no changes to the spec that require deployment. This can be reproduced 
> through PUT/replace with the same spec or a change in upgrade mode.
> The operator should provide the observed spec, which in conjunction with 
> deployment state can then be used by clients to determine that the spec has 
> been reconciled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30858) Kubernetes operator does not update reconciled generation

2023-01-31 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-30858:


 Summary: Kubernetes operator does not update reconciled generation
 Key: FLINK-30858
 URL: https://issues.apache.org/jira/browse/FLINK-30858
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.1
Reporter: Thomas Weise


Kubernetes manages the generation field as part of the spec metadata. It will 
be increased when changes are made to the resource. The counterpart in status 
is "observed generation", provided by a controller. By comparing the two, the 
client can determine that the controller has processed the spec and in 
conjunction with other status information conclude that a change has been 
reconciled.

The Flink operator currently tracks the generation as part of reconciled and 
stable specs but these cannot be used as "observed generation" to perform the 
check. The value isn't updated in cases where operator determines that there 
are no changes to the spec that require deployment. This can be reproduced 
through PUT/replace with the same spec or a change in upgrade mode.

The operator should provide the observed spec, which in conjunction with 
deployment state can then be used by clients to determine that the spec has 
been reconciled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode

2022-12-04 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-29109.
--
Resolution: Fixed

> Checkpoint path conflict with stateless upgrade mode
> 
>
> Key: FLINK-29109
> URL: https://issues.apache.org/jira/browse/FLINK-29109
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.3.0, kubernetes-operator-1.2.0
>
>
> A stateful job with stateless upgrade mode (yes, there are such use cases) 
> fails with checkpoint path conflict due to constant jobId and FLINK-19358 
> (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint 
> id resets on restart the job is going to write to previously used locations 
> and fail. The workaround is to rotate the jobId on every redeploy when the 
> upgrade mode is stateless. While this can be worked around externally it is 
> best done in the operator itself because reconciliation resolves when a 
> restart is actually required while rotating jobId externally may trigger 
> unnecessary restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode

2022-12-04 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-29109:
-
Fix Version/s: kubernetes-operator-1.3.0

> Checkpoint path conflict with stateless upgrade mode
> 
>
> Key: FLINK-29109
> URL: https://issues.apache.org/jira/browse/FLINK-29109
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0, kubernetes-operator-1.3.0
>
>
> A stateful job with stateless upgrade mode (yes, there are such use cases) 
> fails with checkpoint path conflict due to constant jobId and FLINK-19358 
> (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint 
> id resets on restart the job is going to write to previously used locations 
> and fail. The workaround is to rotate the jobId on every redeploy when the 
> upgrade mode is stateless. While this can be worked around externally it is 
> best done in the operator itself because reconciliation resolves when a 
> restart is actually required while rotating jobId externally may trigger 
> unnecessary restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode

2022-12-03 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642849#comment-17642849
 ] 

Thomas Weise commented on FLINK-29109:
--

[~gyfora] thanks for catching this. Because the jobId assigned by Flink is 
deterministic (HighAvailabilityOptions.HA_CLUSTER_ID), we will also need to 
apply the random jobId for stateless upgrade mode for Flink version >= 1.16 to 
avoid the checkpoint path collisions. 

https://github.com/apache/flink/blob/e70fe68dea764606180ca3728184c00fc63ea0ff/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L227

> Checkpoint path conflict with stateless upgrade mode
> 
>
> Key: FLINK-29109
> URL: https://issues.apache.org/jira/browse/FLINK-29109
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0
>
>
> A stateful job with stateless upgrade mode (yes, there are such use cases) 
> fails with checkpoint path conflict due to constant jobId and FLINK-19358 
> (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint 
> id resets on restart the job is going to write to previously used locations 
> and fail. The workaround is to rotate the jobId on every redeploy when the 
> upgrade mode is stateless. While this can be worked around externally it is 
> best done in the operator itself because reconciliation resolves when a 
> restart is actually required while rotating jobId externally may trigger 
> unnecessary restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30266) Recovery reconciliation loop fails if no checkpoint has been created yet

2022-12-01 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642133#comment-17642133
 ] 

Thomas Weise commented on FLINK-30266:
--

I believe this was discussed before and the reason we decided to not allow this 
was that we cannot safely determine the reason why the HA metadata is missing. 
It could be because there was never any successful checkpoint or because it was 
removed by mistake? As long as we can ensure that we don't accidentally reset a 
job with prior state to empty state I would also prefer the solution that does 
not involve manual intervention.

> Recovery reconciliation loop fails if no checkpoint has been created yet
> 
>
> Key: FLINK-30266
> URL: https://issues.apache.org/jira/browse/FLINK-30266
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Maximilian Michels
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.3.0
>
>
> When the upgradeMode is LAST-STATE, the operator fails to reconcile a failed 
> application unless at least one checkpoint has already been created. The 
> expected behavior would be that the job starts with empty state.
> {noformat}
> 2022-12-01 10:58:35,596 o.a.f.k.o.l.AuditUtils [INFO ] [app] >>> 
> Status | Error   | UPGRADING   | 
> {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA
>  metadata not available to restore from last state. It is possible that the 
> job has finished or terminally failed, or the configmaps have been deleted. 
> Manual restore 
> required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]}
>  {noformat}
> {noformat}
> 2022-12-01 10:44:49,480 i.j.o.p.e.ReconciliationDispatcher [ERROR] [app] 
> Error during event processing ExecutionScope{ resource id: 
> ResourceID{name='app', namespace='namespace'}, version: 216933301} failed.
> org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
> java.lang.RuntimeException: This indicates a bug...
>   at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
>   at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
>   at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
>   at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
>   at 
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>   at 
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
>   at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
>   at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
>   at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
>   at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
>   at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.RuntimeException: This indicates a bug...
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:61)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:212)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:144)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:167)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:64)
>   at 
> 

[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering

2022-11-30 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641610#comment-17641610
 ] 

Thomas Weise commented on FLINK-29634:
--

[~pnowojski] please note that this ticket aims to add periodic triggering to 
the flink-kubernetes-operator, not to core Flink. Periodic triggering of 
savepoints (the existing feature) fits well into the charter of 
flink-kubernetes-operator. Based on FLINK-27101 it seems straightforward to 
extend it to let the operator also trigger full snapshots that can be used for 
recovery. It may even help to bridge the changed semantics of intermediate 
savepoints from FLINK-29856

 

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29099) Deadlock for Single Subtask in Kinesis Consumer

2022-11-28 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640298#comment-17640298
 ] 

Thomas Weise commented on FLINK-29099:
--

[~sethsaperstein] thanks for the thorough investigation and fix!

> Deadlock for Single Subtask in Kinesis Consumer
> ---
>
> Key: FLINK-29099
> URL: https://issues.apache.org/jira/browse/FLINK-29099
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.9.3, 1.10.3, 1.11.6, 1.12.7, 1.13.6, 1.14.5, 1.15.3
>Reporter: seth saperstein
>Assignee: seth saperstein
>Priority: Minor
>  Labels: connector, consumer, kinesis, pull-request-available
> Fix For: 1.17.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Deadlock is reached as the result of:
>  * max lookahead reached for local watermark
>  * idle state for subtask
> The lookahead prevents the RecordEmitter from emitting a new record. The idle 
> state prevents the global watermark from being updated.
> To exit this deadlock state, we need to complete the [TODO 
> here|https://github.com/apache/flink/blob/221d70d9930f72147422ea24b399f006ebbfb8d7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1268]
>  which updates the global watermark while the subtask is marked idle, which 
> will then allow us to emit a record again as the lookahead is no longer 
> reached.
>  
> *Context:*
> We reached this scenario at Lyft as a result of prolonged CPU throttling on 
> all FlinkKinesisConsumer threads for multiple minutes.
> Walking through the series of events for a single subtask:
>  * prolonged CPU throttling occurs and no logs are seen from any 
> FlinkKinesisConsumer thread for up to 15 minutes
>  * after CPU throttling the subtask is marked idle
>  * the subtask has reached the lookahead for its local watermark relative to 
> the global watermark
>  * WatermarkSyncCallback indicates the subtask as idle and does not update 
> the global watermark
>  * emitQueue fills to max
>  * RecordEmitter cannot emit records due to the max lookahead
>  * Deadlock on subtask
> At this point, we had not realized what had happened and processing of all 
> other shards/subtasks had continued for multiple days. When we finally 
> restarted the application, we saw the following behavior:
>  * global watermark recalculated after all subtasks consumed data based on 
> the last kinesis record sequence number
>  * global watermark moved back in time multiple days, to when the subtask was 
> first marked idle
>  * the single subtask processed data while all others remained idle due to 
> the lookahead
> This would have continued until the subtask had caught up to the others and 
> thus the global watermark is within reach of the lookahead for other subtasks.
>  
> *Repro:*
> Too difficult to repro the exact scenario.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-29099) Deadlock for Single Subtask in Kinesis Consumer

2022-11-28 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-29099.
--
Fix Version/s: 1.17.0
   Resolution: Fixed

> Deadlock for Single Subtask in Kinesis Consumer
> ---
>
> Key: FLINK-29099
> URL: https://issues.apache.org/jira/browse/FLINK-29099
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.9.3, 1.10.3, 1.11.6, 1.12.7, 1.13.6, 1.14.5, 1.15.3
>Reporter: seth saperstein
>Priority: Minor
>  Labels: connector, consumer, kinesis, pull-request-available
> Fix For: 1.17.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Deadlock is reached as the result of:
>  * max lookahead reached for local watermark
>  * idle state for subtask
> The lookahead prevents the RecordEmitter from emitting a new record. The idle 
> state prevents the global watermark from being updated.
> To exit this deadlock state, we need to complete the [TODO 
> here|https://github.com/apache/flink/blob/221d70d9930f72147422ea24b399f006ebbfb8d7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1268]
>  which updates the global watermark while the subtask is marked idle, which 
> will then allow us to emit a record again as the lookahead is no longer 
> reached.
>  
> *Context:*
> We reached this scenario at Lyft as a result of prolonged CPU throttling on 
> all FlinkKinesisConsumer threads for multiple minutes.
> Walking through the series of events for a single subtask:
>  * prolonged CPU throttling occurs and no logs are seen from any 
> FlinkKinesisConsumer thread for up to 15 minutes
>  * after CPU throttling the subtask is marked idle
>  * the subtask has reached the lookahead for its local watermark relative to 
> the global watermark
>  * WatermarkSyncCallback indicates the subtask as idle and does not update 
> the global watermark
>  * emitQueue fills to max
>  * RecordEmitter cannot emit records due to the max lookahead
>  * Deadlock on subtask
> At this point, we had not realized what had happened and processing of all 
> other shards/subtasks had continued for multiple days. When we finally 
> restarted the application, we saw the following behavior:
>  * global watermark recalculated after all subtasks consumed data based on 
> the last kinesis record sequence number
>  * global watermark moved back in time multiple days, to when the subtask was 
> first marked idle
>  * the single subtask processed data while all others remained idle due to 
> the lookahead
> This would have continued until the subtask had caught up to the others and 
> thus the global watermark is within reach of the lookahead for other subtasks.
>  
> *Repro:*
> Too difficult to repro the exact scenario.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29099) Deadlock for Single Subtask in Kinesis Consumer

2022-11-28 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-29099:


Assignee: seth saperstein

> Deadlock for Single Subtask in Kinesis Consumer
> ---
>
> Key: FLINK-29099
> URL: https://issues.apache.org/jira/browse/FLINK-29099
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.9.3, 1.10.3, 1.11.6, 1.12.7, 1.13.6, 1.14.5, 1.15.3
>Reporter: seth saperstein
>Assignee: seth saperstein
>Priority: Minor
>  Labels: connector, consumer, kinesis, pull-request-available
> Fix For: 1.17.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Deadlock is reached as the result of:
>  * max lookahead reached for local watermark
>  * idle state for subtask
> The lookahead prevents the RecordEmitter from emitting a new record. The idle 
> state prevents the global watermark from being updated.
> To exit this deadlock state, we need to complete the [TODO 
> here|https://github.com/apache/flink/blob/221d70d9930f72147422ea24b399f006ebbfb8d7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1268]
>  which updates the global watermark while the subtask is marked idle, which 
> will then allow us to emit a record again as the lookahead is no longer 
> reached.
>  
> *Context:*
> We reached this scenario at Lyft as a result of prolonged CPU throttling on 
> all FlinkKinesisConsumer threads for multiple minutes.
> Walking through the series of events for a single subtask:
>  * prolonged CPU throttling occurs and no logs are seen from any 
> FlinkKinesisConsumer thread for up to 15 minutes
>  * after CPU throttling the subtask is marked idle
>  * the subtask has reached the lookahead for its local watermark relative to 
> the global watermark
>  * WatermarkSyncCallback indicates the subtask as idle and does not update 
> the global watermark
>  * emitQueue fills to max
>  * RecordEmitter cannot emit records due to the max lookahead
>  * Deadlock on subtask
> At this point, we had not realized what had happened and processing of all 
> other shards/subtasks had continued for multiple days. When we finally 
> restarted the application, we saw the following behavior:
>  * global watermark recalculated after all subtasks consumed data based on 
> the last kinesis record sequence number
>  * global watermark moved back in time multiple days, to when the subtask was 
> first marked idle
>  * the single subtask processed data while all others remained idle due to 
> the lookahead
> This would have continued until the subtask had caught up to the others and 
> thus the global watermark is within reach of the lookahead for other subtasks.
>  
> *Repro:*
> Too difficult to repro the exact scenario.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-12675) Event time synchronization in Kafka consumer

2022-11-25 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-12675.
--
Resolution: Won't Fix

> Event time synchronization in Kafka consumer
> 
>
> Key: FLINK-12675
> URL: https://issues.apache.org/jira/browse/FLINK-12675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Thomas Weise
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Attachments: 0001-Kafka-event-time-alignment.patch
>
>
> Integrate the source watermark tracking into the Kafka consumer and implement 
> the sync mechanism (different consumer model, compared to Kinesis).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-10886) Event time synchronization across sources

2022-11-25 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-10886.
--
Resolution: Done

> Event time synchronization across sources
> -
>
> Key: FLINK-10886
> URL: https://issues.apache.org/jira/browse/FLINK-10886
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Jamie Grier
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> When reading from a source with many parallel partitions, especially when 
> reading lots of historical data (or recovering from downtime and there is a 
> backlog to read), it's quite common for there to develop an event-time skew 
> across those partitions.
>  
> When doing event-time windowing -- or in fact any event-time driven 
> processing -- the event time skew across partitions results directly in 
> increased buffering in Flink and of course the corresponding state/checkpoint 
> size growth.
>  
> As the event-time skew and state size grows larger this can have a major 
> effect on application performance and in some cases result in a "death 
> spiral" where the application performance get's worse and worse as the state 
> size grows and grows.
>  
> So, one solution to this problem, outside of core changes in Flink itself, 
> seems to be to try to coordinate sources across partitions so that they make 
> progress through event time at roughly the same rate.  In fact if there is 
> large skew the idea would be to slow or even stop reading from some 
> partitions with newer data while first reading the partitions with older 
> data.  Anyway, to do this we need to share state somehow amongst sub-tasks.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-12675) Event time synchronization in Kafka consumer

2022-11-25 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638662#comment-17638662
 ] 

Thomas Weise commented on FLINK-12675:
--

Partition/split level alignment is supported with the new KafkaSource: 
https://issues.apache.org/jira/browse/FLINK-28853

> Event time synchronization in Kafka consumer
> 
>
> Key: FLINK-12675
> URL: https://issues.apache.org/jira/browse/FLINK-12675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Thomas Weise
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Attachments: 0001-Kafka-event-time-alignment.patch
>
>
> Integrate the source watermark tracking into the Kafka consumer and implement 
> the sync mechanism (different consumer model, compared to Kinesis).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30004) Cannot resume deployment after suspend with savepoint due to leftover configmaps

2022-11-15 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-30004.
--
Fix Version/s: kubernetes-operator-1.3.0
   Resolution: Fixed

> Cannot resume deployment after suspend with savepoint due to leftover 
> configmaps
> 
>
> Key: FLINK-30004
> URL: https://issues.apache.org/jira/browse/FLINK-30004
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.3.0
>
>
> Due to the possibility of incomplete cleanup of HA data in Flink 1.14, the 
> deployment can get into a limbo state that requires manual intervention after 
> suspend with savepoint. If the config maps are not cleaned up the resumed job 
> will be considered finished and the operator recognize the JM deployment as 
> missing. Due to check for HA data which are now cleaned up, the job fails to 
> start and manual redeployment with initial savepoint is necessary.
> This can be avoided by removing any leftover HA config maps after the job has 
> successfully stopped with savepoint (upgrade mode savepoint).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30004) Cannot resume deployment after suspend with savepoint due to leftover configmaps

2022-11-15 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-30004:
-
Summary: Cannot resume deployment after suspend with savepoint due to 
leftover configmaps  (was: Cannot resume deployment after suspend with 
savepoint due to leftover confgmaps)

> Cannot resume deployment after suspend with savepoint due to leftover 
> configmaps
> 
>
> Key: FLINK-30004
> URL: https://issues.apache.org/jira/browse/FLINK-30004
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> Due to the possibility of incomplete cleanup of HA data in Flink 1.14, the 
> deployment can get into a limbo state that requires manual intervention after 
> suspend with savepoint. If the config maps are not cleaned up the resumed job 
> will be considered finished and the operator recognize the JM deployment as 
> missing. Due to check for HA data which are now cleaned up, the job fails to 
> start and manual redeployment with initial savepoint is necessary.
> This can be avoided by removing any leftover HA config maps after the job has 
> successfully stopped with savepoint (upgrade mode savepoint).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30004) Cannot resume deployment after suspend with savepoint due to leftover confgmaps

2022-11-12 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-30004:


 Summary: Cannot resume deployment after suspend with savepoint due 
to leftover confgmaps
 Key: FLINK-30004
 URL: https://issues.apache.org/jira/browse/FLINK-30004
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: 1.2
Reporter: Thomas Weise
Assignee: Thomas Weise


Due to the possibility of incomplete cleanup of HA data in Flink 1.14, the 
deployment can get into a limbo state that requires manual intervention after 
suspend with savepoint. If the config maps are not cleaned up the resumed job 
will be considered finished and the operator recognize the JM deployment as 
missing. Due to check for HA data which are now cleaned up, the job fails to 
start and manual redeployment with initial savepoint is necessary.

This can be avoided by removing any leftover HA config maps after the job has 
successfully stopped with savepoint (upgrade mode savepoint).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29871) Upgrade operator Flink version and examples to 1.16

2022-11-04 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629128#comment-17629128
 ] 

Thomas Weise commented on FLINK-29871:
--

+1

> Upgrade operator Flink version and examples to 1.16
> ---
>
> Key: FLINK-29871
> URL: https://issues.apache.org/jira/browse/FLINK-29871
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> We should update our Flink dependency and the default example version to 1.16



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29871) Upgrade operator Flink version and examples to 1.16

2022-11-04 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629085#comment-17629085
 ] 

Thomas Weise commented on FLINK-29871:
--

X.Y.0 releases are not vetted and prone to surprises. Stable environments 
typically move to a X.Y.1 or later Flink version. Can we make this so that we 
support 1.16.x but make it the default for operator only once 1.16.1 comes out?

> Upgrade operator Flink version and examples to 1.16
> ---
>
> Key: FLINK-29871
> URL: https://issues.apache.org/jira/browse/FLINK-29871
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> We should update our Flink dependency and the default example version to 1.16



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering

2022-10-14 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617676#comment-17617676
 ] 

Thomas Weise commented on FLINK-29634:
--

[~Jiale] that PR it the right place to look at wrt how periodic savepoint 
triggering was added. Please note that a good portion of it is related to 
savepoint history also though. The operator does not maintain checkpoint 
history as checkpoints (so far) are triggered by Flink internally. Although it 
may be ultimately good to also keep track of checkpoints that were triggered by 
the operator within the CR status, perhaps it is best if we start with just the 
periodic triggering support? WDYT [~gyfora] ?

Please note that in order to work on this, the operator first needs to 
recognize Flink version 1.17 (currently it supports up to 1.16, see 
FlinkVersion). Then this feature needs to be built so that it is only effective 
for 1.17+ and operator keeps working with the other versions.

 

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29634) Support periodic checkpoint triggering

2022-10-13 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-29634:


Assignee: Jiale Tan

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29634) Support periodic checkpoint triggering

2022-10-13 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-29634:


 Summary: Support periodic checkpoint triggering
 Key: FLINK-29634
 URL: https://issues.apache.org/jira/browse/FLINK-29634
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Thomas Weise


Similar to the support for periodic savepoints, the operator should support 
triggering periodic checkpoints to break the incremental checkpoint chain.

Support for external triggering will come with 1.17: 
https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29497) Provide an option to publish the flink-dist jar file artifact

2022-10-09 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17614743#comment-17614743
 ] 

Thomas Weise commented on FLINK-29497:
--

Hi [~chesnay] , at the moment we are using this to replace flink-dist.jar in 
our docker image. Going forward, we may also be interested to use it as one 
stop shop for local development dependencies, since it would result in a more 
consistent environment setup.

Your idea of splitting the current dist makes perfectly sense. The dist jar is 
useful outside the binary distribution and the full binary distribution could 
be assembled by using the published dist jar.

I'm going to open a PR to add the deploy switch in case someone else is 
interested using it.

 

> Provide an option to publish the flink-dist jar file artifact
> -
>
> Key: FLINK-29497
> URL: https://issues.apache.org/jira/browse/FLINK-29497
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.16.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>
> Currently deployment is skipped for the flink-dist jar file. Instead of 
> hardcoding that in pom.xml, use a property that can control this behavior 
> from the maven command line.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-27479) HybridSource refreshes availability future

2022-10-09 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-27479.
--
Resolution: Fixed

> HybridSource refreshes availability future
> --
>
> Key: FLINK-27479
> URL: https://issues.apache.org/jira/browse/FLINK-27479
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: hybrid-source-with-fix.html, 
> hybrid-source-without-fix.html, kafka-source.html
>
>
> HybridSourceReader needs to refresh the availability future according to the 
> underlying reader. It currently maintains its own future and completes it 
> after the sub-reader's availability future is complete. However, the 
> implementation does not refresh the future again until the reader receives a 
> switch event. This can cause a tight loop with the Flink runtime repeatedly 
> invoking pollNext() and high CPU utilization.
>  
> To solve this, we can reuse the MultipleFuturesAvailabilityHelper to manage 
> the lifecycle of the availability future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27479) HybridSource refreshes availability future

2022-10-09 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27479:
-
Fix Version/s: (was: 1.15.3)

> HybridSource refreshes availability future
> --
>
> Key: FLINK-27479
> URL: https://issues.apache.org/jira/browse/FLINK-27479
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: hybrid-source-with-fix.html, 
> hybrid-source-without-fix.html, kafka-source.html
>
>
> HybridSourceReader needs to refresh the availability future according to the 
> underlying reader. It currently maintains its own future and completes it 
> after the sub-reader's availability future is complete. However, the 
> implementation does not refresh the future again until the reader receives a 
> switch event. This can cause a tight loop with the Flink runtime repeatedly 
> invoking pollNext() and high CPU utilization.
>  
> To solve this, we can reuse the MultipleFuturesAvailabilityHelper to manage 
> the lifecycle of the availability future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29497) Provide an option to publish the flink-dist jar file artifact

2022-10-03 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-29497:


 Summary: Provide an option to publish the flink-dist jar file 
artifact
 Key: FLINK-29497
 URL: https://issues.apache.org/jira/browse/FLINK-29497
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.16.0
Reporter: Thomas Weise
Assignee: Thomas Weise


Currently deployment is skipped for the flink-dist jar file. Instead of 
hardcoding that in pom.xml, use a property that can control this behavior from 
the maven command line.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29413) Make it possible to associate triggered and completed savepoints

2022-09-26 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17609561#comment-17609561
 ] 

Thomas Weise commented on FLINK-29413:
--

+1 for tracking the triggering nonce

Would there be any other way to retrieve the savepoint type given that Flink 
does not retain a history beyond current job execution? 

 

> Make it possible to associate triggered and completed savepoints
> 
>
> Key: FLINK-29413
> URL: https://issues.apache.org/jira/browse/FLINK-29413
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
>
> Currently it is not clear how one would assoicate completed manual savepoints 
> with savepointTriggerNonce-es when using the operator.
> This makes it difficult to track when a savepoint was completed vs when it 
> was abandoned
> One idea would be to add the savepointTriggerNonce to the completed 
> checkpoint info for Manual savepoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29251) Send CREATED status and Cancel event via FlinkResourceListener

2022-09-14 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise closed FLINK-29251.

Resolution: Fixed

> Send CREATED status and Cancel event via FlinkResourceListener
> --
>
> Key: FLINK-29251
> URL: https://issues.apache.org/jira/browse/FLINK-29251
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Matyas Orhidi
>Assignee: Matyas Orhidi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0
>
>
> To complete the lifecycle history of a custom resource the operator should 
> sent:
>  * CREATED status notification during initial deployment of a CR
>  * Cancel event when deleting a CR



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29100) Deployment with last-state upgrade mode stuck after initial error

2022-08-31 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise closed FLINK-29100.

Resolution: Fixed

> Deployment with last-state upgrade mode stuck after initial error
> -
>
> Key: FLINK-29100
> URL: https://issues.apache.org/jira/browse/FLINK-29100
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0
>
>
> A deployment with last_state upgrade mode that never succeeds will be stuck 
> in deploying state because no HA data exists. This can be reproduced by 
> creating a deployment with invalid image or exception in entry point. Update 
> to the CR that corrects the issue won't be reconciled due to 
> "o.a.f.k.o.r.d.ApplicationReconciler [INFO ] 
> [default.basic-checkpoint-ha-example] Job is not running yet and HA metadata 
> is not available, waiting for upgradeable state". This forces manual 
> intervention to delete the CR.
> Instead,  operator should check if this is the initial deployment and if so 
> skip the HA metadata check.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29100) Deployment with last-state upgrade mode stuck after initial error

2022-08-31 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-29100:
-
Fix Version/s: kubernetes-operator-1.2.0

> Deployment with last-state upgrade mode stuck after initial error
> -
>
> Key: FLINK-29100
> URL: https://issues.apache.org/jira/browse/FLINK-29100
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.1.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.2.0
>
>
> A deployment with last_state upgrade mode that never succeeds will be stuck 
> in deploying state because no HA data exists. This can be reproduced by 
> creating a deployment with invalid image or exception in entry point. Update 
> to the CR that corrects the issue won't be reconciled due to 
> "o.a.f.k.o.r.d.ApplicationReconciler [INFO ] 
> [default.basic-checkpoint-ha-example] Job is not running yet and HA metadata 
> is not available, waiting for upgradeable state". This forces manual 
> intervention to delete the CR.
> Instead,  operator should check if this is the initial deployment and if so 
> skip the HA metadata check.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29159) Revisit/harden initial deployment logic

2022-08-31 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-29159:


 Summary: Revisit/harden initial deployment logic
 Key: FLINK-29159
 URL: https://issues.apache.org/jira/browse/FLINK-29159
 Project: Flink
  Issue Type: Technical Debt
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Thomas Weise


Found isFirstDeployment logic not working as expected for a deployment that had 
never successfully deployed (image pull error).  We are probably also lacking 
test coverage for the initialSavepointPath field.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29144) Enable multiple jar entries for jarURI

2022-08-30 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598040#comment-17598040
 ] 

Thomas Weise commented on FLINK-29144:
--

Please note that the operator is not in the business of interpreting that 
config, including not listing directories to assemble a jar file list. Jar 
files don't have to be "local" either. 

> Enable multiple jar entries for jarURI
> --
>
> Key: FLINK-29144
> URL: https://issues.apache.org/jira/browse/FLINK-29144
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Arseniy Tashoyan
>Priority: Major
>
> The setting _job.jarURI_ accepts a string with the path to the jar-file:
> {code:yaml}
> job:
>   jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> {code}
> This could be improved to accept a list of jars:
> {code:yaml}
> job:
>   jarURIs:
>   - local:///opt/flink/examples/streaming/StateMachineExample.jar
>   - local:///opt/common/scala-logging.jar
> {code}
> This could also be improved to accept one or more directories with jars:
> {code:yaml}
> job:
>   jarDirs:
>   - local:///opt/app/lib
>   - local:///opt/common/lib
> {code}
> The order of entries in the list defines the order of jars in the classpath.
> Internally, Flink Kubernetes Operator uses the property _pipeline.jars_ - see 
> [FlinkConfigBuilder.java 
> |https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L259]:
> {code:java}
> effectiveConfig.set(PipelineOptions.JARS, 
> Collections.singletonList(uri.toString()));
> {code}
> The property _pipeline.jars_ allows to pass more than one jar entry.
> This improvement allows to avoid building a fat-jar. Instead we could provide 
> directories with normal jars.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29144) Enable multiple jar entries for jarURI

2022-08-30 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597903#comment-17597903
 ] 

Thomas Weise commented on FLINK-29144:
--

[~gyfora] since this isn't a common requirement, I would also prefer we allow 
using the configuration parameter PipelineOptions.JARS for this. That would 
mean the operator needs a slight change to not just overwrite that parameter. 

> Enable multiple jar entries for jarURI
> --
>
> Key: FLINK-29144
> URL: https://issues.apache.org/jira/browse/FLINK-29144
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Arseniy Tashoyan
>Priority: Major
>
> The setting _job.jarURI_ accepts a string with the path to the jar-file:
> {code:yaml}
> job:
>   jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> {code}
> This could be improved to accept a list of jars:
> {code:yaml}
> job:
>   jarURIs:
>   - local:///opt/flink/examples/streaming/StateMachineExample.jar
>   - local:///opt/common/scala-logging.jar
> {code}
> This could also be improved to accept one or more directories with jars:
> {code:yaml}
> job:
>   jarDirs:
>   - local:///opt/app/lib
>   - local:///opt/common/lib
> {code}
> The order of entries in the list defines the order of jars in the classpath.
> Internally, Flink Kubernetes Operator uses the property _pipeline.jars_ - see 
> [FlinkConfigBuilder.java 
> |https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L259]:
> {code:java}
> effectiveConfig.set(PipelineOptions.JARS, 
> Collections.singletonList(uri.toString()));
> {code}
> The property _pipeline.jars_ allows to pass more than one jar entry.
> This improvement allows to avoid building a fat-jar. Instead we could provide 
> directories with normal jars.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode

2022-08-25 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-29109:


 Summary: Checkpoint path conflict with stateless upgrade mode
 Key: FLINK-29109
 URL: https://issues.apache.org/jira/browse/FLINK-29109
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Thomas Weise
Assignee: Thomas Weise


A stateful job with stateless upgrade mode (yes, there are such use cases) 
fails with checkpoint path conflict due to constant jobId and FLINK-19358 
(applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint id 
resets on restart the job is going to write to previously used locations and 
fail. The workaround is to rotate the jobId on every redeploy when the upgrade 
mode is stateless. While this can be worked around externally it is best done 
in the operator itself because reconciliation resolves when a restart is 
actually required while rotating jobId externally may trigger unnecessary 
restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29100) Deployment with last-state upgrade mode stuck after initial error

2022-08-24 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-29100:


 Summary: Deployment with last-state upgrade mode stuck after 
initial error
 Key: FLINK-29100
 URL: https://issues.apache.org/jira/browse/FLINK-29100
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Thomas Weise
Assignee: Thomas Weise


A deployment with last_state upgrade mode that never succeeds will be stuck in 
deploying state because no HA data exists. This can be reproduced by creating a 
deployment with invalid image or exception in entry point. Update to the CR 
that corrects the issue won't be reconciled due to 
"o.a.f.k.o.r.d.ApplicationReconciler [INFO ] 
[default.basic-checkpoint-ha-example] Job is not running yet and HA metadata is 
not available, waiting for upgradeable state". This forces manual intervention 
to delete the CR.

Instead,  operator should check if this is the initial deployment and if so 
skip the HA metadata check.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28977) NullPointerException in HybridSourceSplitEnumerator.close

2022-08-23 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-28977.
--
Resolution: Fixed

> NullPointerException in HybridSourceSplitEnumerator.close
> -
>
> Key: FLINK-28977
> URL: https://issues.apache.org/jira/browse/FLINK-28977
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Michael
>Priority: Major
>  Labels: pull-request-available, pull-requests-available
> Fix For: 1.17.0
>
>
> HybridSource pipeline has an intermittent error when reading from s3, usually 
> this error is fixed when pipeline restarts after recovering from checkpoint. 
> But intermittently happens:
> 2022/08/02 22:26:51.435 INFO  o.a.f.runtime.jobmaster.JobMaster - Trying to 
> recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> 
> filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator 
> fd9fbc680ee884c4eafd0b9c2d3d007f).
> at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.cleanAndFailJob(RecreateOnResetOperatorCoordinator.java:393)
> ...
> Caused by: java.lang.NullPointerException: null
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246)
> at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151)
> at 
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70)
> at java.lang.Thread.run(Thread.java:750)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28977) NullPointerException in HybridSourceSplitEnumerator.close

2022-08-23 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-28977:
-
Fix Version/s: 1.17.0

> NullPointerException in HybridSourceSplitEnumerator.close
> -
>
> Key: FLINK-28977
> URL: https://issues.apache.org/jira/browse/FLINK-28977
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Michael
>Priority: Major
>  Labels: pull-request-available, pull-requests-available
> Fix For: 1.17.0
>
>
> HybridSource pipeline has an intermittent error when reading from s3, usually 
> this error is fixed when pipeline restarts after recovering from checkpoint. 
> But intermittently happens:
> 2022/08/02 22:26:51.435 INFO  o.a.f.runtime.jobmaster.JobMaster - Trying to 
> recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> 
> filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator 
> fd9fbc680ee884c4eafd0b9c2d3d007f).
> at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.cleanAndFailJob(RecreateOnResetOperatorCoordinator.java:393)
> ...
> Caused by: java.lang.NullPointerException: null
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246)
> at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151)
> at 
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70)
> at java.lang.Thread.run(Thread.java:750)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-21126) Reconsider FLINK_PROPERTIES

2022-08-22 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-21126:
-
Fix Version/s: (was: 1.17.0)

> Reconsider FLINK_PROPERTIES
> ---
>
> Key: FLINK-21126
> URL: https://issues.apache.org/jira/browse/FLINK-21126
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Scripts
>Reporter: Chesnay Schepler
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The docker scripts support a {{FLINK_PROPERTIES}} environment variable that 
> contains a multi-line string with config options, that is piped into the 
> configuration.
> This variable is somewhat redundant, because docker users can also specify a 
> list of dynamic properties when starting a container, and Kubernetes users 
> are advised to use config maps instead.
> FLIP-161 might also re-introduce new ways to configure Flink via environment 
> variables.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28977) NullPointerException in HybridSourceSplitEnumerator.close

2022-08-15 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-28977:


Assignee: Michael

> NullPointerException in HybridSourceSplitEnumerator.close
> -
>
> Key: FLINK-28977
> URL: https://issues.apache.org/jira/browse/FLINK-28977
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Michael
>Priority: Major
>  Labels: pull-request-available, pull-requests-available
>
> HybridSource pipeline has an intermittent error when reading from s3, usually 
> this error is fixed when pipeline restarts after recovering from checkpoint. 
> But intermittently happens:
> 2022/08/02 22:26:51.435 INFO  o.a.f.runtime.jobmaster.JobMaster - Trying to 
> recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> 
> filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator 
> fd9fbc680ee884c4eafd0b9c2d3d007f).
> at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.cleanAndFailJob(RecreateOnResetOperatorCoordinator.java:393)
> ...
> Caused by: java.lang.NullPointerException: null
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246)
> at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151)
> at 
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70)
> at java.lang.Thread.run(Thread.java:750)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-13 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-28817:
-
Fix Version/s: 1.15.2

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Qishang Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2
>
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-12 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579192#comment-17579192
 ] 

Thomas Weise commented on FLINK-28817:
--

[~Benenson] thank you for the thorough investigation!

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Qishang Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-12 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-28817:


Assignee: Qishang Zhong

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Qishang Zhong
>Priority: Major
>  Labels: pull-request-available
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-12 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-28817:
-
Fix Version/s: 1.16.0

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Qishang Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-12 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-28817.
--
Resolution: Fixed

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Assignee: Qishang Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-12 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579190#comment-17579190
 ] 

Thomas Weise commented on FLINK-28817:
--

[~nicholasjiang] I believe it does. Can you please verify and close FLINK-26938 
if so?

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Priority: Major
>  Labels: pull-request-available
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-11 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578507#comment-17578507
 ] 

Thomas Weise edited comment on FLINK-28817 at 8/11/22 2:29 PM:
---

[~Benenson] thanks for investigating this issue. I think that has to do with 
the reader not having any restored splits (most likely because none were 
previously assigned) and therefore reporting -1 back to the enumerator. Let me 
check what the correct fix for this is.


was (Author: thw):
[~Benenson] thanks for investigating this issue. I think that has to do with 
the reader not having any restored splits (most likely because non were 
assigned) and therefore reporting -1 back to the enumerator. Let me check what 
the correct fix for this is.

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Priority: Major
>  Labels: pull-request-available
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28817) NullPointerException in HybridSource when restoring from checkpoint

2022-08-11 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578507#comment-17578507
 ] 

Thomas Weise commented on FLINK-28817:
--

[~Benenson] thanks for investigating this issue. I think that has to do with 
the reader not having any restored splits (most likely because non were 
assigned) and therefore reporting -1 back to the enumerator. Let me check what 
the correct fix for this is.

> NullPointerException in HybridSource when restoring from checkpoint
> ---
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.14.4, 1.15.1
>Reporter: Michael
>Priority: Major
>  Labels: pull-request-available
> Attachments: Preconditions-checkNotNull-error.zip, 
> bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # HybridSourceSplitEnumerator.switchEnumerator failed with 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed 
> out. This is intermittent error, it is usually fixed, when Flink recover from 
> checkpoint & repeat the operation.
>  # Flink starts recovering from checkpoint, 
>  # HybridSourceSplitEnumerator receives 
> SourceReaderFinishedEvent\{sourceIndex=-1}
>  # Processing this event cause 
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
> exception in the SplitEnumerator for Source Source: hybrid-source while 
> handling operator event SourceEventWrapper[SourceReaderFinishedEvent
> {sourceIndex=-1}
> ] from subtask 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from 
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at 
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> ...
> I'm running my version of the Hybrid Sources with additional logging, so line 
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e. 
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my 
> log, it happens if my SourceFactory.create()  is executed BEFORE 
> HybridSourceSplitEnumerator - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
> Preconditions-checkNotNull-error log from JobMgr is attached



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28722) Hybrid Source should use .equals() for Integer comparison

2022-07-27 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572148#comment-17572148
 ] 

Thomas Weise commented on FLINK-28722:
--

[~mason6345] please take a look at the PR in linked JIRA.

> Hybrid Source should use .equals() for Integer comparison
> -
>
> Key: FLINK-28722
> URL: https://issues.apache.org/jira/browse/FLINK-28722
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.1
>Reporter: Mason Chen
>Priority: Major
> Fix For: 1.16.0, 1.15.2
>
>
> HybridSource should use .equals() for Integer comparison in filtering out the 
> underlying sources. This causes the HybridSource to stop working when it hits 
> the 128th source (would not work for anything past 127 sources).
> https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358
>  
> A user reported this issue here: 
> https://lists.apache.org/thread/7h2rblsdt7rjf85q9mhfht77bghtbswh



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28722) Hybrid Source should use .equals() for Integer comparison

2022-07-27 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise closed FLINK-28722.

Resolution: Duplicate

> Hybrid Source should use .equals() for Integer comparison
> -
>
> Key: FLINK-28722
> URL: https://issues.apache.org/jira/browse/FLINK-28722
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.1
>Reporter: Mason Chen
>Priority: Major
> Fix For: 1.16.0, 1.15.2
>
>
> HybridSource should use .equals() for Integer comparison in filtering out the 
> underlying sources. This causes the HybridSource to stop working when it hits 
> the 128th source (would not work for anything past 127 sources).
> https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358
>  
> A user reported this issue here: 
> https://lists.apache.org/thread/7h2rblsdt7rjf85q9mhfht77bghtbswh



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27650) First environment variable of top level pod template is lost

2022-07-20 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27650:
-
Fix Version/s: (was: kubernetes-operator-1.1.0)

> First environment variable of top level pod template is lost
> 
>
> Key: FLINK-27650
> URL: https://issues.apache.org/jira/browse/FLINK-27650
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-0.1.0
>Reporter: Simon Paradis
>Priority: Major
> Attachments: flink-27650.yaml
>
>
> I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* 
> to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template 
> feature to inject environment variable to control structured JSON logging.
> I noticed the first defined environment variable is never injected into the 
> JobManager nor TaskManager pods. The work around is to define a dummy env. 
> var.
> Here's the manifest template. This gets processed by a tool that will first 
> expand ${ENV_VAR} reference with values provided by our CI pipeline. We 
> should not have to create the FLINK_COORDINATES_DUMMY env var.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28541) Add OwnerReferences to FlinkDeployment CR in jobmanager Deployment

2022-07-13 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566505#comment-17566505
 ] 

Thomas Weise commented on FLINK-28541:
--

See https://issues.apache.org/jira/browse/FLINK-26812

 

> Add OwnerReferences to FlinkDeployment CR in jobmanager Deployment
> --
>
> Key: FLINK-28541
> URL: https://issues.apache.org/jira/browse/FLINK-28541
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: ChangZhuo Chen (陳昌倬)
>Priority: Major
>
> `ownerReferences` is used by Argo CD 
> (https://argo-cd.readthedocs.io/en/stable/) to display relation between 
> resources. Since there is no `ownerReferences` in jobmanager Deployment, Argo 
> CD cannot know this Deployment is created by FlinkDeployment CR. Thus Argo CD 
> cannot display full resources managed by FlinkDeployment CR.
> Discuss thread in 
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1657639397473729



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28364) Python Job support for Kubernetes Operator

2022-07-06 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563304#comment-17563304
 ] 

Thomas Weise commented on FLINK-28364:
--

Thanks for the ping [~gyfora]. This is something that can be solved nicely with 
a custom application image and does not require any change to the operator. I 
had done something similar in the past: 
https://github.com/lyft/flinkk8soperator/tree/master/examples/beam-python

> Python Job support for Kubernetes Operator
> --
>
> Key: FLINK-28364
> URL: https://issues.apache.org/jira/browse/FLINK-28364
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: wu3396
>Priority: Major
>
> *Describe the solution*
> Job types that I want to support pyflink for
> *Describe alternatives*
> like [here|https://github.com/spotify/flink-on-k8s-operator/pull/165]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27009) Support SQL job submission in flink kubernetes opeartor

2022-07-05 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562716#comment-17562716
 ] 

Thomas Weise commented on FLINK-27009:
--

I would prefer if any sql processing logic remains separated from the operator. 
That would also mean a fat client that performs translation and requires 
additional dependencies should not execute in the operator. I think that can be 
accomplished by providing a separate entry point that executes in application 
mode and then the user can supply any other dependencies as needed as part of 
the image. Is that the plan?

> Support SQL job submission in flink kubernetes opeartor
> ---
>
> Key: FLINK-27009
> URL: https://issues.apache.org/jira/browse/FLINK-27009
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Biao Geng
>Assignee: Gyula Fora
>Priority: Major
>
> Currently, the flink kubernetes opeartor is for jar job using application or 
> session cluster. For SQL job, there is no out of box solution in the 
> operator.  
> One simple and short-term solution is to wrap the SQL script into a jar job 
> using table API with limitation.
> The long-term solution may work with 
> [FLINK-26541|https://issues.apache.org/jira/browse/FLINK-26541] to achieve 
> the full support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-26891) Emit events for important Deployment / Job changes

2022-06-23 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17558113#comment-17558113
 ] 

Thomas Weise edited comment on FLINK-26891 at 6/23/22 2:33 PM:
---

This will add events that will provide more visibility to the user via k8s 
events. Examples: spec change detected, application submitted etc.

 


was (Author: thw):
This will add events that will provide more visibility to the user. Examples: 
spec change detected, application submitted etc.

 

> Emit events for important Deployment / Job changes
> --
>
> Key: FLINK-26891
> URL: https://issues.apache.org/jira/browse/FLINK-26891
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Thomas Weise
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> We should try capturing the important deployment states, such as RUNNING, 
> FAILING, DEPLOYING



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26891) Emit events for important Deployment / Job changes

2022-06-23 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17558113#comment-17558113
 ] 

Thomas Weise commented on FLINK-26891:
--

This will add events that will provide more visibility to the user. Examples: 
spec change detected, application submitted etc.

 

> Emit events for important Deployment / Job changes
> --
>
> Key: FLINK-26891
> URL: https://issues.apache.org/jira/browse/FLINK-26891
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Thomas Weise
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> We should try capturing the important deployment states, such as RUNNING, 
> FAILING, DEPLOYING



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-26891) Emit events for important Deployment / Job changes

2022-06-22 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-26891:


Assignee: Thomas Weise  (was: Matyas Orhidi)

> Emit events for important Deployment / Job changes
> --
>
> Key: FLINK-26891
> URL: https://issues.apache.org/jira/browse/FLINK-26891
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Thomas Weise
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> We should try capturing the important deployment states, such as RUNNING, 
> FAILING, DEPLOYING



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27101) Periodically break the chain of incremental checkpoint

2022-06-10 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-27101:


Assignee: Jiale Tan

> Periodically break the chain of incremental checkpoint
> --
>
> Key: FLINK-27101
> URL: https://issues.apache.org/jira/browse/FLINK-27101
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Steven Zhen Wu
>Assignee: Jiale Tan
>Priority: Major
>
> Incremental checkpoint is almost a must for large-state jobs. It greatly 
> reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
> implications from incremental checkpoint that are problematic for production 
> operations.  Will use S3 as an example DFS for the rest of description.
> 1. Because there is no way to deterministically know how far back the 
> incremental checkpoint can refer to files uploaded to S3, it is very 
> difficult to set S3 bucket/object TTL. In one application, we have observed 
> Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can 
> corrupt the Flink checkpoints.
> S3 TTL is important for a few reasons
> - purge orphaned files (like external checkpoints from previous deployments) 
> to keep the storage cost in check. This problem can be addressed by 
> implementing proper garbage collection (similar to JVM) by traversing the 
> retained checkpoints from all jobs and traverse the file references. But that 
> is an expensive solution from engineering cost perspective.
> - Security and privacy. E.g., there may be requirement that Flink state can't 
> keep the data for more than some duration threshold (hours/days/weeks). 
> Application is expected to purge keys to satisfy the requirement. However, 
> with incremental checkpoint and how deletion works in RocksDB, it is hard to 
> set S3 TTL to purge S3 files. Even though those old S3 files don't contain 
> live keys, they may still be referrenced by retained Flink checkpoints.
> 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a 
> result, restoring from checkpoint failed. With incremental checkpoint, it 
> usually doesn't help to try other older checkpoints, because they may refer 
> to the same corrupted file. It is unclear whether the corruption happened 
> before or during S3 upload. This risk can be mitigated with periodical 
> savepoints.
> It all boils down to periodical full snapshot (checkpoint or savepoint) to 
> deterministically break the chain of incremental checkpoints. Search the jira 
> history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
> what we would need here.
> There are a few options
> 1. Periodically trigger savepoints (via control plane). This is actually not 
> a bad practice and might be appealing to some people. The problem is that it 
> requires a job deployment to break the chain of incremental checkpoint. 
> periodical job deployment may sound hacky. If we make the behavior of full 
> checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
> an acceptable compromise. The benefit is that no job deployment is required 
> after savepoints.
> 2. Build the feature in Flink incremental checkpoint. Periodically (with some 
> cron style config) trigger a full checkpoint to break the incremental chain. 
> If the full checkpoint failed (due to whatever reason), the following 
> checkpoints should attempt full checkpoint as well until one successful full 
> checkpoint is completed.
> 3. For the security/privacy requirement, the main thing is to apply 
> compaction on the deleted keys. That could probably avoid references to the 
> old files. Is there any RocksDB compation can achieve full compaction of 
> removing old delete markers. Recent delete markers are fine
> [1] https://issues.apache.org/jira/browse/FLINK-23949



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27788) Adding annotation to k8 operator Pod

2022-05-25 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-27788:


Assignee: Jaganathan Asokan

> Adding annotation to k8 operator Pod
> 
>
> Key: FLINK-27788
> URL: https://issues.apache.org/jira/browse/FLINK-27788
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-0.1.0, kubernetes-operator-1.1.0
>Reporter: Jaganathan Asokan
>Assignee: Jaganathan Asokan
>Priority: Minor
>
> Currently we lack the option to natively add annotations on flink operator 
> pods. Providing this feature directly on our existing helm chart, could be 
> useful. One potential use-case for allowing annotations on Pod is to enable 
> scrapping of opertor metrics by monitoring Infrastructure like Prometheus , 
> Datadog etc.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27788) Adding annotation to k8 operator Pod

2022-05-25 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27788:
-
Fix Version/s: (was: kubernetes-operator-0.1.0)
   (was: kubernetes-operator-1.1.0)

> Adding annotation to k8 operator Pod
> 
>
> Key: FLINK-27788
> URL: https://issues.apache.org/jira/browse/FLINK-27788
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-0.1.0, kubernetes-operator-1.1.0
>Reporter: Jaganathan Asokan
>Priority: Minor
>
> Currently we lack the option to natively add annotations on flink operator 
> pods. Providing this feature directly on our existing helm chart, could be 
> useful. One potential use-case for allowing annotations on Pod is to enable 
> scrapping of opertor metrics by monitoring Infrastructure like Prometheus , 
> Datadog etc.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27650) First environment variable of top level pod template is lost

2022-05-16 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537773#comment-17537773
 ] 

Thomas Weise commented on FLINK-27650:
--

That's as per 
[https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L83]

Maybe it is better to just override?

This isn't a "blocker" though.

 

> First environment variable of top level pod template is lost
> 
>
> Key: FLINK-27650
> URL: https://issues.apache.org/jira/browse/FLINK-27650
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-0.1.0
>Reporter: Simon Paradis
>Priority: Major
> Fix For: kubernetes-operator-1.0.0
>
> Attachments: flink-27650.yaml
>
>
> I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* 
> to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template 
> feature to inject environment variable to control structured JSON logging.
> I noticed the first defined environment variable is never injected into the 
> JobManager nor TaskManager pods. The work around is to define a dummy env. 
> var.
> Here's the manifest template. This gets processed by a tool that will first 
> expand ${ENV_VAR} reference with values provided by our CI pipeline. We 
> should not have to create the FLINK_COORDINATES_DUMMY env var.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27650) First environment variable of top level pod template is lost

2022-05-16 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27650:
-
Priority: Major  (was: Blocker)

> First environment variable of top level pod template is lost
> 
>
> Key: FLINK-27650
> URL: https://issues.apache.org/jira/browse/FLINK-27650
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-0.1.0
>Reporter: Simon Paradis
>Priority: Major
> Fix For: kubernetes-operator-1.0.0
>
> Attachments: flink-27650.yaml
>
>
> I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* 
> to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template 
> feature to inject environment variable to control structured JSON logging.
> I noticed the first defined environment variable is never injected into the 
> JobManager nor TaskManager pods. The work around is to define a dummy env. 
> var.
> Here's the manifest template. This gets processed by a tool that will first 
> expand ${ENV_VAR} reference with values provided by our CI pipeline. We 
> should not have to create the FLINK_COORDINATES_DUMMY env var.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27465) AvroRowDeserializationSchema.convertToTimestamp fails with negative nano seconds

2022-05-13 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27465:
-
Fix Version/s: 1.15.1

> AvroRowDeserializationSchema.convertToTimestamp fails with negative nano 
> seconds
> 
>
> Key: FLINK-27465
> URL: https://issues.apache.org/jira/browse/FLINK-27465
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.15.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.15.1
>
>
> The issue is exposed due to time zone dependency in 
> AvroRowDeSerializationSchemaTest.
>  
> The root cause is that convertToTimestamp attempts to set negative value with 
> java.sql.Timestamp.setNanos



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters

2022-05-13 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27255:
-
Fix Version/s: 1.15.1

> Flink-avro does not support serialization and deserialization of avro schema 
> longer than 65535 characters
> -
>
> Key: FLINK-27255
> URL: https://issues.apache.org/jira/browse/FLINK-27255
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.14.4
>Reporter: Haizhou Zhao
>Assignee: Haizhou Zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.14.5, 1.15.1
>
>
> The underlying serialization of avro schema uses string serialization method 
> of ObjectOutputStream.class, however, the default string serialization by 
> ObjectOutputStream.class does not support handling string of more than 66535 
> characters (64kb). As a result, constructing flink operators that 
> input/output Avro Generic Record with huge schema is not possible.
>  
> The purposed fix is two change the serialization and deserialization method 
> of these following classes so that huge string could also be handled.
>  
> [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107]
> [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55]
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27594) Only recover JM deployment if HA metadata available

2022-05-12 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536266#comment-17536266
 ] 

Thomas Weise commented on FLINK-27594:
--

[~gyfora] are you saying that when no HA metadata is available and the upgrade 
mode is LAST_STATE then the operator should keep the deployment in error state? 
I think that would be correct. When the upgrade mode is SAVEPOINT, then it can 
go back to that savepoint?

I also think that with LAST_STATE we should pick either last checkpoint or 
savepoint, whichever is more recent.

> Only recover JM deployment if HA metadata available
> ---
>
> Key: FLINK-27594
> URL: https://issues.apache.org/jira/browse/FLINK-27594
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
> Fix For: kubernetes-operator-1.0.0
>
>
> This ticket is related to https://issues.apache.org/jira/browse/FLINK-27572
> The deployment recovery logic for list jobmanager deployments simply performs 
> a restoreFromLasteSavepoint operation currently.
> This is incorrect in cases where the HA metadata is not available as it might 
> lead to accidentally restoring from an older state.
> We should verify that HA metadata is present and simply perform a 
> deployOperation. Once we have this we can actually make the recovery default 
> true for all versions.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters

2022-05-12 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536222#comment-17536222
 ] 

Thomas Weise commented on FLINK-27255:
--

[~Keathalin21] please also open a PR for 1.15.x

> Flink-avro does not support serialization and deserialization of avro schema 
> longer than 65535 characters
> -
>
> Key: FLINK-27255
> URL: https://issues.apache.org/jira/browse/FLINK-27255
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.14.4
>Reporter: Haizhou Zhao
>Assignee: Haizhou Zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.14.5
>
>
> The underlying serialization of avro schema uses string serialization method 
> of ObjectOutputStream.class, however, the default string serialization by 
> ObjectOutputStream.class does not support handling string of more than 66535 
> characters (64kb). As a result, constructing flink operators that 
> input/output Avro Generic Record with huge schema is not possible.
>  
> The purposed fix is two change the serialization and deserialization method 
> of these following classes so that huge string could also be handled.
>  
> [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107]
> [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55]
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters

2022-05-12 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27255:
-
Fix Version/s: 1.14.5

> Flink-avro does not support serialization and deserialization of avro schema 
> longer than 65535 characters
> -
>
> Key: FLINK-27255
> URL: https://issues.apache.org/jira/browse/FLINK-27255
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.14.4
>Reporter: Haizhou Zhao
>Assignee: Haizhou Zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.14.5
>
>
> The underlying serialization of avro schema uses string serialization method 
> of ObjectOutputStream.class, however, the default string serialization by 
> ObjectOutputStream.class does not support handling string of more than 66535 
> characters (64kb). As a result, constructing flink operators that 
> input/output Avro Generic Record with huge schema is not possible.
>  
> The purposed fix is two change the serialization and deserialization method 
> of these following classes so that huge string could also be handled.
>  
> [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107]
> [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55]
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters

2022-05-10 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-27255.
--
Resolution: Fixed

> Flink-avro does not support serialization and deserialization of avro schema 
> longer than 65535 characters
> -
>
> Key: FLINK-27255
> URL: https://issues.apache.org/jira/browse/FLINK-27255
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.14.4
>Reporter: Haizhou Zhao
>Assignee: Haizhou Zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> The underlying serialization of avro schema uses string serialization method 
> of ObjectOutputStream.class, however, the default string serialization by 
> ObjectOutputStream.class does not support handling string of more than 66535 
> characters (64kb). As a result, constructing flink operators that 
> input/output Avro Generic Record with huge schema is not possible.
>  
> The purposed fix is two change the serialization and deserialization method 
> of these following classes so that huge string could also be handled.
>  
> [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107]
> [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55]
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters

2022-05-10 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-27255:
-
Fix Version/s: 1.16.0

> Flink-avro does not support serialization and deserialization of avro schema 
> longer than 65535 characters
> -
>
> Key: FLINK-27255
> URL: https://issues.apache.org/jira/browse/FLINK-27255
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.14.4
>Reporter: Haizhou Zhao
>Assignee: Haizhou Zhao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> The underlying serialization of avro schema uses string serialization method 
> of ObjectOutputStream.class, however, the default string serialization by 
> ObjectOutputStream.class does not support handling string of more than 66535 
> characters (64kb). As a result, constructing flink operators that 
> input/output Avro Generic Record with huge schema is not possible.
>  
> The purposed fix is two change the serialization and deserialization method 
> of these following classes so that huge string could also be handled.
>  
> [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107]
> [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55]
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27435) Kubernetes Operator keeps savepoint history

2022-05-10 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534609#comment-17534609
 ] 

Thomas Weise commented on FLINK-27435:
--

[~gyfora] [~wangyang0918] I'm considering adding an optional List to 
either JobStatus or SavepointInfo and update that whenever a new savepoint is 
recorded in the observer. That would be complemented by purging savepoints 
based on time and count limit configuration parameters. WDTY?

> Kubernetes Operator keeps savepoint history
> ---
>
> Key: FLINK-27435
> URL: https://issues.apache.org/jira/browse/FLINK-27435
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: kubernetes-operator-1.0.0
>
>
> Currently the operator keeps track of the most recent savepoint that was 
> triggered through savepointTriggerNonce. In some cases it is necessary to 
> find older savepoints. For that, it would be nice if the operator can 
> optionally maintain a savepoint history (and perhaps also trigger disposal of 
> savepoints that fall out of the history). The maximum number of savepoints 
> retained could be configured by cound and/or age.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27435) Kubernetes Operator keeps savepoint history

2022-05-10 Thread Thomas Weise (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-27435:


Assignee: Thomas Weise

> Kubernetes Operator keeps savepoint history
> ---
>
> Key: FLINK-27435
> URL: https://issues.apache.org/jira/browse/FLINK-27435
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: kubernetes-operator-1.0.0
>
>
> Currently the operator keeps track of the most recent savepoint that was 
> triggered through savepointTriggerNonce. In some cases it is necessary to 
> find older savepoints. For that, it would be nice if the operator can 
> optionally maintain a savepoint history (and perhaps also trigger disposal of 
> savepoints that fall out of the history). The maximum number of savepoints 
> retained could be configured by cound and/or age.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27495) Observer should update last savepoint information directly from cluster too

2022-05-10 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534537#comment-17534537
 ] 

Thomas Weise commented on FLINK-27495:
--

Hi [~gyfora] can we detail how the REST API can help covering these cases:
 * Cancel with savepoint: how do we guarantee that we retrieve the savepoint 
information before the JM terminates and REST API becomes unavailable?
 * Fatal failure: the REST API would not be available in that case, and the HA 
data should be available. So we should not rely on the REST API in this case?
 * Job finishes: In this case, do we even need the checkpoint information?

 

> Observer should update last savepoint information directly from cluster too
> ---
>
> Key: FLINK-27495
> URL: https://issues.apache.org/jira/browse/FLINK-27495
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> The observer should fetch the list checkpoints from the observed job and 
> store the last savepoint into the status directly.
> This is especially useful for terminal job states in Flink 1.15 as it allows 
> us to avoid cornercases such as the operator failing after calling 
> cancel-with-savepoint but before updating the status.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27495) Observer should update last savepoint information directly from cluster too

2022-05-09 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534131#comment-17534131
 ] 

Thomas Weise commented on FLINK-27495:
--

[~gyfora] is this for the case of a successfully completed job when the HA 
store was already removed? In other cases when the job isn't healthy the REST 
API may also be unavailable. And if we rely on polling the REST API, we may 
miss the latest checkpoint? Isn't reading from the HA store at the time when 
the path is required the only reliable method?

> Observer should update last savepoint information directly from cluster too
> ---
>
> Key: FLINK-27495
> URL: https://issues.apache.org/jira/browse/FLINK-27495
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>
> The observer should fetch the list checkpoints from the observed job and 
> store the last savepoint into the status directly.
> This is especially useful for terminal job states in Flink 1.15 as it allows 
> us to avoid cornercases such as the operator failing after calling 
> cancel-with-savepoint but before updating the status.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


  1   2   3   4   5   6   >