[jira] [Commented] (FLINK-33536) Flink Table API CSV streaming sink fails with "IOException: Stream closed"

2023-11-15 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786318#comment-17786318
 ] 

Samrat Deb commented on FLINK-33536:


[~prabhujoseph] , [~martijnvisser]  
I have tested the changes and raised a PR. Please review the changes whenever 
time 

> Flink Table API CSV streaming sink fails with "IOException: Stream closed"
> --
>
> Key: FLINK-33536
> URL: https://issues.apache.org/jira/browse/FLINK-33536
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Prabhu Joseph
>Priority: Major
>  Labels: pull-request-available
>
> Flink Table API CSV streaming sink fails with "IOException: Stream closed". 
> Prior to Flink 1.18, CSV streaming sink used to fail with 
> "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by 
> [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix 
> seems not complete, it fails with this issue now.
> *Repro*
> {code}
> SET 'execution.runtime-mode' = 'streaming';
> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/dummy_table/',
>   'format' = 'csv'
> );
> INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
> (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
> {code}
> *Error*
> {code}
> Caused by: java.io.IOException: Stream closed.
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76)
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
>   at 
> org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
>   at 
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>   at 
> 

[jira] [Commented] (FLINK-33536) Flink Table API CSV streaming sink fails with "IOException: Stream closed"

2023-11-13 Thread Prabhu Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785702#comment-17785702
 ] 

Prabhu Joseph commented on FLINK-33536:
---

Thanks [~samrat007].

> Flink Table API CSV streaming sink fails with "IOException: Stream closed"
> --
>
> Key: FLINK-33536
> URL: https://issues.apache.org/jira/browse/FLINK-33536
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink Table API CSV streaming sink fails with "IOException: Stream closed". 
> Prior to Flink 1.18, CSV streaming sink used to fail with 
> "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by 
> [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix 
> seems not complete, it fails with this issue now.
> *Repro*
> {code}
> SET 'execution.runtime-mode' = 'streaming';
> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/dummy_table/',
>   'format' = 'csv'
> );
> INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
> (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
> {code}
> *Error*
> {code}
> Caused by: java.io.IOException: Stream closed.
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76)
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
>   at 
> org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
>   at 
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>   at 
> 

[jira] [Commented] (FLINK-33536) Flink Table API CSV streaming sink fails with "IOException: Stream closed"

2023-11-13 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785634#comment-17785634
 ] 

Samrat Deb commented on FLINK-33536:


Hello [~prabhujoseph]  and [~martijnvisser] ,

I was able to reproduced the issue and have been investigating FLINK-28513. I'm 
currently uncertain about what might have been overlooked during the initial 
work. I'll proceed with debugging to identify the root cause and will share the 
findings along with any necessary fixes.

 

> Flink Table API CSV streaming sink fails with "IOException: Stream closed"
> --
>
> Key: FLINK-33536
> URL: https://issues.apache.org/jira/browse/FLINK-33536
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink Table API CSV streaming sink fails with "IOException: Stream closed". 
> Prior to Flink 1.18, CSV streaming sink used to fail with 
> "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by 
> [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix 
> seems not complete, it fails with this issue now.
> *Repro*
> {code}
> SET 'execution.runtime-mode' = 'streaming';
> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/dummy_table/',
>   'format' = 'csv'
> );
> INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
> (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
> {code}
> *Error*
> {code}
> Caused by: java.io.IOException: Stream closed.
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76)
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
>   at 
> org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
>   at 
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>   at 
> 

[jira] [Commented] (FLINK-33536) Flink Table API CSV streaming sink fails with "IOException: Stream closed"

2023-11-13 Thread Prabhu Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785573#comment-17785573
 ] 

Prabhu Joseph commented on FLINK-33536:
---

We are using flink-s3-fs-hadoop-1.18.0.jar.

> Flink Table API CSV streaming sink fails with "IOException: Stream closed"
> --
>
> Key: FLINK-33536
> URL: https://issues.apache.org/jira/browse/FLINK-33536
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink Table API CSV streaming sink fails with "IOException: Stream closed". 
> Prior to Flink 1.18, CSV streaming sink used to fail with 
> "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by 
> [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix 
> seems not complete, it fails with this issue now.
> *Repro*
> {code}
> SET 'execution.runtime-mode' = 'streaming';
> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/dummy_table/',
>   'format' = 'csv'
> );
> INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
> (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
> {code}
> *Error*
> {code}
> Caused by: java.io.IOException: Stream closed.
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76)
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
>   at 
> org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
>   at 
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>   at 
> 

[jira] [Commented] (FLINK-33536) Flink Table API CSV streaming sink fails with "IOException: Stream closed"

2023-11-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785546#comment-17785546
 ] 

Martijn Visser commented on FLINK-33536:


Which S3 plugin was used here?

> Flink Table API CSV streaming sink fails with "IOException: Stream closed"
> --
>
> Key: FLINK-33536
> URL: https://issues.apache.org/jira/browse/FLINK-33536
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink Table API CSV streaming sink fails with "IOException: Stream closed". 
> Prior to Flink 1.18, CSV streaming sink used to fail with 
> "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by 
> [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix 
> seems not complete, it fails with this issue now.
> *Repro*
> {code}
> SET 'execution.runtime-mode' = 'streaming';
> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/dummy_table/',
>   'format' = 'csv'
> );
> INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
> (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
> {code}
> *Error*
> {code}
> Caused by: java.io.IOException: Stream closed.
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76)
>   at 
> org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
>   at 
> org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209)
>   at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
>   at 
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367)
>   at 
> org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>   at 
>