[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/5563


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170629598
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

Ah, so it's a pre-existing problem. I'll add a `finally` block for that.


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170628342
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

No, because the bucketingSink never calls `close()` on the writer if 
`open()` threw an exception.


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170626581
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

Ah yeah, I would call super if the `keyValueWriter` is `null`. That should 
do it, right?


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170606282
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

For this issue that would be enough.

However we can still leak streams if `open()` throws an exception after 
`super.open()` returns. Then the stream is already open. If you use the 
BucketingSink this will leak the stream:

```
// openNewPartFile
bucketState.writer.open(fs, inProgressPath); // if this throws exception 
the stream can be open
bucketState.isWriterOpen = true;
```

```
// closeCurrentPartFile
if (bucketState.isWriterOpen) { // not closing partially opened writer
bucketState.writer.close();
bucketState.isWriterOpen = false;
}
```

So we should modify open() to properly close the stream.


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170596050
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

WDYT?



---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170596001
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

I would be in favour of just having
```
@Override
public void close() throws IOException {
if (keyValueWriter != null) {
keyValueWriter.close();
}
}
```
then. It seems all usual stream/file stuff seems to close the stream that 
was handed in.


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170530494
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

alternatively, we could wrap the stream we give to avro and ignore close() 
calls, and then close it ourselves. The we wouldn't rely on implementation 
details of avro.


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5563#discussion_r170528786
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
@@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
 
@Override
public void close() throws IOException {
-   super.close(); //the order is important since super.close 
flushes inside
+   flush(); // super.close() also does a flush
if (keyValueWriter != null) {
--- End diff --

the writer already flushes when calling close.

If the vriter is null we should call super.close() to make sure the stream 
is closed.


---


[GitHub] flink pull request #5563: [FLINK-8543] Don't call super.close() in AvroKeyVa...

2018-02-22 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5563

[FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter

The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close()
will eventually call flush() on the wrapped stream which fails if we
close it before(). Now we call flush ourselves before closing the
KeyValyeWriter, which internally closes the wrapped stream eventually.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-8543-fix-avro-writer-stream-close

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5563


commit 548dea4c0811ffbfaf1566aaae88551f29eb5af9
Author: Aljoscha Krettek 
Date:   2018-02-22T16:24:33Z

[FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter

The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close()
will eventually call flush() on the wrapped stream which fails if we
close it before(). Now we call flush ourselves before closing the
KeyValyeWriter, which internally closes the wrapped stream eventually.




---