[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-09-29 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770637#comment-17770637
 ] 

jirawech.s commented on FLINK-31689:


[~martijnvisser] Could you help with this PR, seems like no one review it

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
>  Labels: pull-request-available
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-08-12 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753648#comment-17753648
 ] 

jirawech.s commented on FLINK-31689:


waiting for PR review https://github.com/apache/flink/pull/22400

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
>  Labels: pull-request-available
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-05-29 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727071#comment-17727071
 ] 

jirawech.s commented on FLINK-31689:


Anyone can help me review PR pls

https://github.com/apache/flink/pull/22400

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
>  Labels: pull-request-available
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-14 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712290#comment-17712290
 ] 

jirawech.s commented on FLINK-31689:


[~luoyuxia] I open [PR|https://github.com/apache/flink/pull/22400]. Could you 
help me review?

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
>  Labels: pull-request-available
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-05 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709178#comment-17709178
 ] 

luoyuxia commented on FLINK-31689:
--

[~jirawech.s] Assign to you~ You can tried in your local and pr back. I can 
help view.

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Assignee: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-05 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708875#comment-17708875
 ] 

jirawech.s commented on FLINK-31689:


[~luoyuxia] Is it possible that you assign me to do this task? Or what do i do 
next?

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-04 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708271#comment-17708271
 ] 

jirawech.s commented on FLINK-31689:


[~luoyuxia] Thank you so much for detailed explanation.
What about i tried on my local, built and tested it and PR back? I see that i 
need this first to contribute back
*Only start working on the implementation if there is consensus on the approach 
(e.g. you are assigned to the ticket)*

[link|https://flink.apache.org/how-to-contribute/overview/]

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-03 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708187#comment-17708187
 ] 

luoyuxia commented on FLINK-31689:
--

It's in 
[here|https://github.com/apache/flink/blob/0915c9850d861165e283acc0f60545cd836f0567/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114].

I think the these code line :
{code:java}
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next()); {code}
, we can check where this.expiredFilesState.get().iterator().hasNext(), and 
then put. 

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-03 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708030#comment-17708030
 ] 

jirawech.s commented on FLINK-31689:


[~luoyuxia] I see. So, we could say that it is normal behaviour of 
CompactOperator in File Sink for now. If we were to improve, we could do that 
by implementing state compatible CompactOperator right? Could you point me to 
the code/class i should check out. I am not so familiar with Flink development

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-03 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707928#comment-17707928
 ] 

luoyuxia commented on FLINK-31689:
--

[~jirawech.s] Yes, it's normal behavior. Maybe you can disable auto compaction 
and try it again.  By say "the state", I mean the state of {{CompactOperator}} 
in File Sink. Other sinks have their own implementation which can be state 
compatible.

But after diving into the {{{}CompactOperator{}}}, I think  it can be 
implemented in state compatible style which then won't throw exception though 
we change parallelism.

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-03 Thread jirawech.s (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707891#comment-17707891
 ] 

jirawech.s commented on FLINK-31689:


Is this normal behavior? If i use Kafka Sink, i am able to increase/decrease 
parallelism fine. 

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

2023-04-02 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707742#comment-17707742
 ] 

luoyuxia commented on FLINK-31689:
--

Please remember the state is not compatible as you have changed parallelism. 
So, it throw the exception.

> Filesystem sink fails when parallelism of compactor operator changed
> 
>
> Key: FLINK-31689
> URL: https://issues.apache.org/jira/browse/FLINK-31689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: jirawech.s
>Priority: Major
> Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:864)
>   at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)