[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/5563 ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170629598 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- Ah, so it's a pre-existing problem. I'll add a `finally` block for that. ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170628342 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- No, because the bucketingSink never calls `close()` on the writer if `open()` threw an exception. ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170626581 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- Ah yeah, I would call super if the `keyValueWriter` is `null`. That should do it, right? ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170606282 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- For this issue that would be enough. However we can still leak streams if `open()` throws an exception after `super.open()` returns. Then the stream is already open. If you use the BucketingSink this will leak the stream: ``` // openNewPartFile bucketState.writer.open(fs, inProgressPath); // if this throws exception the stream can be open bucketState.isWriterOpen = true; ``` ``` // closeCurrentPartFile if (bucketState.isWriterOpen) { // not closing partially opened writer bucketState.writer.close(); bucketState.isWriterOpen = false; } ``` So we should modify open() to properly close the stream. ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170596050 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- WDYT? ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170596001 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- I would be in favour of just having ``` @Override public void close() throws IOException { if (keyValueWriter != null) { keyValueWriter.close(); } } ``` then. It seems all usual stream/file stuff seems to close the stream that was handed in. ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170530494 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- alternatively, we could wrap the stream we give to avro and ignore close() calls, and then close it ourselves. The we wouldn't rely on implementation details of avro. ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170528786 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- the writer already flushes when calling close. If the vriter is null we should call super.close() to make sure the stream is closed. ---
[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5563 [FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close() will eventually call flush() on the wrapped stream which fails if we close it before(). Now we call flush ourselves before closing the KeyValyeWriter, which internally closes the wrapped stream eventually. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8543-fix-avro-writer-stream-close Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5563 commit 548dea4c0811ffbfaf1566aaae88551f29eb5af9 Author: Aljoscha KrettekDate: 2018-02-22T16:24:33Z [FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close() will eventually call flush() on the wrapped stream which fails if we close it before(). Now we call flush ourselves before closing the KeyValyeWriter, which internally closes the wrapped stream eventually. ---