[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336577#comment-17336577
 ] 

Flink Jira Bot commented on FLINK-10382:


This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Major
>  Labels: stale-major
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328556#comment-17328556
 ] 

Flink Jira Bot commented on FLINK-10382:


This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Major
>  Labels: stale-major
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2019-08-19 Thread John Lonergan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16910497#comment-16910497
 ] 

John Lonergan commented on FLINK-10382:
---

Just fell over this same problem - Fundamentally this class doesn't work as far 
as I can tell.

Presumably there are missing tests - which raises a question around QA in Flink 
- what is the overall quality of testing in Flink?

I suggest that rather than just documenting this class as deprecated it comes 
with a comment saying it's broken and should be avoided - or just delete it 
entirely if this class has been abandoned.

---

As far as a "fix" I think something like this ought to work. Flush needed first 
to avoid potential data loss.
 
{{@Override public void close() throws IOException { 
   if (keyValueWriter != null) {
     flush(); 
     super.close();
     keyValueWriter = null;
  } else { // need to make sure we close this if we never created the Key/Value 
Writer.
      super.close();
  }
}
}}


Is the bucketing sink abandoned?

This close issue only seems to be safe on StringWriter and SequenceFileWriter, 
the first because there's nothing to close and the second because it knows 
whether it owns the stream or not.



> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Major
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [

[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2018-10-18 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655242#comment-16655242
 ] 

Aljoscha Krettek commented on FLINK-10382:
--

I think it would be good to somehow fix it but I'm not actively working on it, 
and the workaround is to copy and implement on your own. So it's somewhat ok 
for now.

> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Major
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2018-10-18 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654906#comment-16654906
 ] 

Till Rohrmann commented on FLINK-10382:
---

Are there any plans to fix this issue [~aljoscha]?

> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Major
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2018-09-24 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625572#comment-16625572
 ] 

Aljoscha Krettek commented on FLINK-10382:
--

It was probably not considered. I don't think we can simply revert it, because 
this would introduce the original bug again.

> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Blocker
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10382) Writer has already been opened while using AvroKeyValueSinkWriter and BucketingSink

2018-09-20 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622246#comment-16622246
 ] 

Stefan Richter commented on FLINK-10382:


Thanks for reporting this. [~aljoscha] was this consequence of the change 
considered and how shall we handle the regression?

> Writer has already been opened while using AvroKeyValueSinkWriter and 
> BucketingSink
> ---
>
> Key: FLINK-10382
> URL: https://issues.apache.org/jira/browse/FLINK-10382
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chengzhi Zhao
>Priority: Blocker
>
> I am using flink 1.6.0 and I am using AvroKeyValueSinkWriter and 
> BucketingSink to S3.
>  
> After the application running for a while ~ 20 mins, I got an *exception: 
> java.lang.IllegalStateException: Writer has already been opened*
> {code:java}
> 2018-09-17 15:40:23,771 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1537198823640 for job 8f9ab122fb7452714465eb1e1989e4d7.
> 2018-09-17 15:41:27,805 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/16) 
> (25914cb3f77c8e4271b0fb6ea597ed50) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-17 15:41:27,808 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Stream to Stream 
> Join (8f9ab122fb7452714465eb1e1989e4d7) switched from state RUNNING to 
> FAILING.
> java.lang.IllegalStateException: Writer has already been opened
> at 
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:68)
> at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:150)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:583)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
> at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> [https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6]
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> {code:java}
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> {code}
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)