[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336577#comment-17336577 ] Flink Jira Bot commented on FLINK-10382: This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Writer has already been opened while using AvroKeyValueSinkWriter and > BucketingSink > --- > > Key: FLINK-10382 > URL: https://issues.apache.org/jira/browse/FLINK-10382 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chengzhi Zhao >Priority: Major > Labels: stale-major > > I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and > BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an *exception: > java.lang.IllegalStateException: Writer has already been opened* > {code:java} > 2018-09-17 15:40:23,771 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7. > 2018-09-17 15:41:27,805 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) > (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-17 15:41:27,808 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream > Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to > FAILING. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6] > I created my own AvroKeyValueSinkWriter class and implement the code similar > as v1.4, it seems running fine now. > {code:java} > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > {code} > I am curious if anyone had the similar issue, Appreciate anyone has insights > on it. Thanks! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328556#comment-17328556 ] Flink Jira Bot commented on FLINK-10382: This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Writer has already been opened while using AvroKeyValueSinkWriter and > BucketingSink > --- > > Key: FLINK-10382 > URL: https://issues.apache.org/jira/browse/FLINK-10382 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chengzhi Zhao >Priority: Major > Labels: stale-major > > I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and > BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an *exception: > java.lang.IllegalStateException: Writer has already been opened* > {code:java} > 2018-09-17 15:40:23,771 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7. > 2018-09-17 15:41:27,805 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) > (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-17 15:41:27,808 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream > Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to > FAILING. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6] > I created my own AvroKeyValueSinkWriter class and implement the code similar > as v1.4, it seems running fine now. > {code:java} > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > {code} > I am curious if anyone had the similar issue, Appreciate anyone has insights > on it. Thanks! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16910497#comment-16910497 ] John Lonergan commented on FLINK-10382: --- Just fell over this same problem - Fundamentally this class doesn't work as far as I can tell. Presumably there are missing tests - which raises a question around QA in Flink - what is the overall quality of testing in Flink? I suggest that rather than just documenting this class as deprecated it comes with a comment saying it's broken and should be avoided - or just delete it entirely if this class has been abandoned. --- As far as a "fix" I think something like this ought to work. Flush needed first to avoid potential data loss. {{@Override public void close() throws IOException { if (keyValueWriter != null) { flush(); super.close(); keyValueWriter = null; } else { // need to make sure we close this if we never created the Key/Value Writer. super.close(); } } }} Is the bucketing sink abandoned? This close issue only seems to be safe on StringWriter and SequenceFileWriter, the first because there's nothing to close and the second because it knows whether it owns the stream or not. > Writer has already been opened while using AvroKeyValueSinkWriter and > BucketingSink > --- > > Key: FLINK-10382 > URL: https://issues.apache.org/jira/browse/FLINK-10382 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chengzhi Zhao >Priority: Major > > I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and > BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an *exception: > java.lang.IllegalStateException: Writer has already been opened* > {code:java} > 2018-09-17 15:40:23,771 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7. > 2018-09-17 15:41:27,805 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) > (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-17 15:41:27,808 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream > Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to > FAILING. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > [
[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655242#comment-16655242 ] Aljoscha Krettek commented on FLINK-10382: -- I think it would be good to somehow fix it but I'm not actively working on it, and the workaround is to copy and implement on your own. So it's somewhat ok for now. > Writer has already been opened while using AvroKeyValueSinkWriter and > BucketingSink > --- > > Key: FLINK-10382 > URL: https://issues.apache.org/jira/browse/FLINK-10382 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chengzhi Zhao >Priority: Major > > I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and > BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an *exception: > java.lang.IllegalStateException: Writer has already been opened* > {code:java} > 2018-09-17 15:40:23,771 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7. > 2018-09-17 15:41:27,805 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) > (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-17 15:41:27,808 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream > Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to > FAILING. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6] > I created my own AvroKeyValueSinkWriter class and implement the code similar > as v1.4, it seems running fine now. > {code:java} > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > {code} > I am curious if anyone had the similar issue, Appreciate anyone has insights > on it. Thanks! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654906#comment-16654906 ] Till Rohrmann commented on FLINK-10382: --- Are there any plans to fix this issue [~aljoscha]? > Writer has already been opened while using AvroKeyValueSinkWriter and > BucketingSink > --- > > Key: FLINK-10382 > URL: https://issues.apache.org/jira/browse/FLINK-10382 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chengzhi Zhao >Priority: Major > > I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and > BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an *exception: > java.lang.IllegalStateException: Writer has already been opened* > {code:java} > 2018-09-17 15:40:23,771 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7. > 2018-09-17 15:41:27,805 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) > (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-17 15:41:27,808 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream > Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to > FAILING. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6] > I created my own AvroKeyValueSinkWriter class and implement the code similar > as v1.4, it seems running fine now. > {code:java} > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > {code} > I am curious if anyone had the similar issue, Appreciate anyone has insights > on it. Thanks! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625572#comment-16625572 ] Aljoscha Krettek commented on FLINK-10382: -- It was probably not considered. I don't think we can simply revert it, because this would introduce the original bug again. > Writer has already been opened while using AvroKeyValueSinkWriter and > BucketingSink > --- > > Key: FLINK-10382 > URL: https://issues.apache.org/jira/browse/FLINK-10382 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chengzhi Zhao >Priority: Blocker > > I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and > BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an *exception: > java.lang.IllegalStateException: Writer has already been opened* > {code:java} > 2018-09-17 15:40:23,771 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7. > 2018-09-17 15:41:27,805 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) > (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-17 15:41:27,808 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream > Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to > FAILING. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6] > I created my own AvroKeyValueSinkWriter class and implement the code similar > as v1.4, it seems running fine now. > {code:java} > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > {code} > I am curious if anyone had the similar issue, Appreciate anyone has insights > on it. Thanks! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622246#comment-16622246 ] Stefan Richter commented on FLINK-10382: Thanks for reporting this. [~aljoscha] was this consequence of the change considered and how shall we handle the regression? > Writer has already been opened while using AvroKeyValueSinkWriter and > BucketingSink > --- > > Key: FLINK-10382 > URL: https://issues.apache.org/jira/browse/FLINK-10382 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chengzhi Zhao >Priority: Blocker > > I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and > BucketingSink to S3. > > After the application running for a while ~ 20 mins, I got an *exception: > java.lang.IllegalStateException: Writer has already been opened* > {code:java} > 2018-09-17 15:40:23,771 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7. > 2018-09-17 15:41:27,805 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) > (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-17 15:41:27,808 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream > Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to > FAILING. > java.lang.IllegalStateException: Writer has already been opened > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > After checking the code, I think the issue might be related to > AvroKeyValueSinkWriter.java and led to the writer has not been closed > completely. I also noticed this change and affect 1.5+ > [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6] > I created my own AvroKeyValueSinkWriter class and implement the code similar > as v1.4, it seems running fine now. > {code:java} > @Override > public void close() throws IOException { > try { > super.close(); > } finally { > if (keyValueWriter != null) { > keyValueWriter.close(); > } > } > } > {code} > I am curious if anyone had the similar issue, Appreciate anyone has insights > on it. Thanks! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)