[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432430#comment-16432430 ] mingleizhang commented on FLINK-9087: - It seems that [~triones] does not have permission to perform the write operation at the moment. I could support helps or committer can give you a permission, then you can do it by yourself. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432403#comment-16432403 ] ASF GitHub Bot commented on FLINK-9087: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5802 @trionesadam 👍 > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432401#comment-16432401 ] ASF GitHub Bot commented on FLINK-9087: --- Github user trionesadam commented on the issue: https://github.com/apache/flink/pull/5802 This PR is ready for having another review. @NicoK , @tedyu , Thank you. @tedyu , do we need change the description of this jira? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432399#comment-16432399 ] Ted Yu commented on FLINK-9087: --- You can modify the description to match your fix. Thanks > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432385#comment-16432385 ] Triones Deng commented on FLINK-9087: - This PR is ready for having another review. [~NicoK], [~yuzhih...@gmail.com], Thank you. [~yuzhih...@gmail.com], do we need change the description of this jira? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16429743#comment-16429743 ] Triones Deng commented on FLINK-9087: - thanks for your suggestions, i will follow this. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426882#comment-16426882 ] ASF GitHub Bot commented on FLINK-9087: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5802#discussion_r179451610 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -164,7 +164,7 @@ public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException { if (flushAlways) { flushAll(); } - return eventBufferConsumer; --- End diff -- You don't need to close the `eventBufferConsumer` since the try-with-resources should already do that. Returning the closed value, however, is kind of strange and only needed in one test. Let's make the method return `void` as you suggested and adapt the test. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426883#comment-16426883 ] ASF GitHub Bot commented on FLINK-9087: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5802#discussion_r179453256 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java --- @@ -309,8 +309,6 @@ public void testBroadcastEventBufferReferenceCounting() throws Exception { for (int i = 0; i < queues.length; i++) { assertTrue(parseBuffer(queues[i].remove(), i).isEvent()); } - - assertTrue(bufferConsumer.isRecycled()); } --- End diff -- yes, this should not be dropped since this line is the actual test - how about this instead: ``` // Verify added to all queues assertEquals(1, queues[0].size()); assertEquals(1, queues[1].size()); // get references to buffer consumers (copies from the original event buffer consumer) BufferConsumer bufferConsumer1 = queues[0].getFirst(); BufferConsumer bufferConsumer2 = queues[1].getFirst(); // process all collected events (recycles the buffer) assertTrue(parseBuffer(queues[0].remove(), 0).isEvent()); assertTrue(parseBuffer(queues[1].remove(), 1).isEvent()); assertTrue(bufferConsumer1.isRecycled()); assertTrue(bufferConsumer2.isRecycled()); ``` > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424237#comment-16424237 ] Ted Yu commented on FLINK-9087: --- [~NicoK]: Since you were recently working on related code, mind sharing your thought ? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424169#comment-16424169 ] Triones Deng commented on FLINK-9087: - [~yuzhih...@gmail.com] when i run the test. i found that in {code:java} public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException { try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) { ... // retain the buffer so that it can be recycled by each channel of targetPartition targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); } ... return eventBufferConsumer; } } {code} which call targetPartition.addBufferConsumer() , here make use of the copy of the eventBufferConsumer, so, all the BufferConsumer produced by copy share the same buffer.and this will call AbstractReferenceCountedByteBuf.retain() , here AbstractReferenceCountedByteBuf.java is netty class all the targetPartition like AbstractCollectingResultPartitionWriter and ResultPartition will call close method of BufferConsumer, at last the buffer in eventBufferConsumer will be released. ResultPartition will call notifyDataAvailable which is async to consume the data. so here we'd better to let the return value alone, what do you think. or just change the method signature to void ? notice that in FLINK-7315, plan to use flink's buffers in netty, one sub task FLINK-7518 which have a solution. i am a new here, any suggestions? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423441#comment-16423441 ] ASF GitHub Bot commented on FLINK-9087: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5802#discussion_r178707684 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java --- @@ -309,8 +309,6 @@ public void testBroadcastEventBufferReferenceCounting() throws Exception { for (int i = 0; i < queues.length; i++) { assertTrue(parseBuffer(queues[i].remove(), i).isEvent()); } - - assertTrue(bufferConsumer.isRecycled()); } --- End diff -- How can you verify your ```bufferConsumer.isRecycled``` logic here if you drop this ? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423431#comment-16423431 ] ASF GitHub Bot commented on FLINK-9087: --- GitHub user trionesadam opened a pull request: https://github.com/apache/flink/pull/5802 [FLINK-9087] [runtime] close the BufferConsumer in RecordWriter.broad… ## What is the purpose of the change BufferConsumer is Closeable, we'd better close it at last, ## Brief change log RecordWriter.broadcastEvent() is called in StreamTask., RecordWriterOutput ,IterationIntermediateTask and IterationHeadTaskbroadcastEvent, also notice that no one make use of the BufferConsumer returned by broadcastEvent(), so i think the better way to close the return value in RecordWriter. change the method signature from BufferConsumer to void, and close the BufferConsumer in the end. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/trionesadam/flink FLINK-9087-close-broadcastEvent Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5802.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5802 commit a8efee487dd2fcd3bc95732c9676f5ed71d19bf0 Author: triones.deng Date: 2018-04-03T02:28:52Z [FLINK-9087] [runtime] close the BufferConsumer in RecordWriter.broadcastEvent() > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423370#comment-16423370 ] Ted Yu commented on FLINK-9087: --- Sounds good. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423367#comment-16423367 ] Triones Deng commented on FLINK-9087: - [~yuzhih...@gmail.com] notice that RecordWriter.broadcastEvent() is called in StreamTask., RecordWriterOutput ,IterationIntermediateTask and IterationHeadTaskbroadcastEvent, also notice that no one make use of the BufferConsumer returned by broadcastEvent(), so i think the better way to close the return value in RecordWriter. change the method signature from BufferConsumer to void, and close the BufferConsumer in the end. does this make sense? what's you idea? > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421810#comment-16421810 ] Ted Yu commented on FLINK-9087: --- When you send a pull request for this issue, some committer would assign this to you. Thanks > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421679#comment-16421679 ] Triones Deng commented on FLINK-9087: - Does anyone can give me a permission to contribute to this issue ? Thank you very much. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421678#comment-16421678 ] mingleizhang commented on FLINK-9087: - Hi, [~triones] I have been working on another issue, you are very welcome to getting this. But you must have the permission to push code before you getting start. > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9087) Return value of broadcastEvent should be closed in StreamTask#performCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421677#comment-16421677 ] Triones Deng commented on FLINK-9087: - [~mingleizhang] are you still working on this? if no, i would like to get the ticket, thank you > Return value of broadcastEvent should be closed in > StreamTask#performCheckpoint > --- > > Key: FLINK-9087 > URL: https://issues.apache.org/jira/browse/FLINK-9087 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > for (StreamRecordWriter>> > streamRecordWriter : streamRecordWriters) { > try { > streamRecordWriter.broadcastEvent(message); > {code} > The BufferConsumer returned by broadcastEvent() should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)