[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378211#comment-16378211 ] chris snow commented on FLINK-8543: --- Thanks for fixing this [~aljoscha]! {quote}The comments are correct, S3 simply does not support truncate. The reason why we need it is that we need to trim files in case of recovering from a failure to get them back to the state they had when we did the last checkpoint. To ensure exactly-once semantics. The solution we came up with for filesystems that don't support truncate is the .valid-length files because we simply cannot take back what was already written but you're right that the client reading those files needs to understand them. Coming up with a better solution will be one of the goals of the 1.6 release cycle, though. {quote} Is there a Jira ticket I can track to follow the changes that will happen in 1.6 to provide better handing for s3 sinks? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377510#comment-16377510 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5563 merged > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om,
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377506#comment-16377506 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/5563 > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om,
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377080#comment-16377080 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5563 @zentol I pushed an update. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377039#comment-16377039 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170629598 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- Ah, so it's a pre-existing problem. I'll add a `finally` block for that. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377032#comment-16377032 ] ASF GitHub Bot commented on FLINK-8543: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170628342 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- No, because the bucketingSink never calls `close()` on the writer if `open()` threw an exception. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377028#comment-16377028 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5563 @IgorBerman Thanks! I'm expecting no problems, just wanted to see if you maybe had an opinion. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377027#comment-16377027 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170626581 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- Ah yeah, I would call super if the `keyValueWriter` is `null`. That should do it, right? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377016#comment-16377016 ] ASF GitHub Bot commented on FLINK-8543: --- Github user IgorBerman commented on the issue: https://github.com/apache/flink/pull/5563 @aljoscha, sorry for late response, I needed to refresh this piece of code. Overall, I think it's ok. I mean at the end I've used same scheme of implementing SinkWriter as other examples, so if you refactoring something in hierarchy then it should have same handling. From avro file perspective(internal writer) you are closing it, which would be sufficient. What problems do you expect here? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376943#comment-16376943 ] ASF GitHub Bot commented on FLINK-8543: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170606282 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- For this issue that would be enough. However we can still leak streams if `open()` throws an exception after `super.open()` returns. Then the stream is already open. If you use the BucketingSink this will leak the stream: ``` // openNewPartFile bucketState.writer.open(fs, inProgressPath); // if this throws exception the stream can be open bucketState.isWriterOpen = true; ``` ``` // closeCurrentPartFile if (bucketState.isWriterOpen) { // not closing partially opened writer bucketState.writer.close(); bucketState.isWriterOpen = false; } ``` So we should modify open() to properly close the stream. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376889#comment-16376889 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170596050 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- WDYT? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376888#comment-16376888 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170596001 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- I would be in favour of just having ``` @Override public void close() throws IOException { if (keyValueWriter != null) { keyValueWriter.close(); } } ``` then. It seems all usual stream/file stuff seems to close the stream that was handed in. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376571#comment-16376571 ] ASF GitHub Bot commented on FLINK-8543: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170530494 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- alternatively, we could wrap the stream we give to avro and ignore close() calls, and then close it ourselves. The we wouldn't rely on implementation details of avro. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376565#comment-16376565 ] ASF GitHub Bot commented on FLINK-8543: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5563#discussion_r170528786 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java --- @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws IOException { @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside + flush(); // super.close() also does a flush if (keyValueWriter != null) { --- End diff -- the writer already flushes when calling close. If the vriter is null we should call super.close() to make sure the stream is closed. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}'
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373026#comment-16373026 ] ASF GitHub Bot commented on FLINK-8543: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5563 @IgorBerman Do you have an idea whether this change could cause problems? As the original implementer. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373022#comment-16373022 ] ASF GitHub Bot commented on FLINK-8543: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5563 [FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close() will eventually call flush() on the wrapped stream which fails if we close it before(). Now we call flush ourselves before closing the KeyValyeWriter, which internally closes the wrapped stream eventually. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8543-fix-avro-writer-stream-close Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5563 commit 548dea4c0811ffbfaf1566aaae88551f29eb5af9 Author: Aljoscha KrettekDate: 2018-02-22T16:24:33Z [FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close() will eventually call flush() on the wrapped stream which fails if we close it before(). Now we call flush ourselves before closing the KeyValyeWriter, which internally closes the wrapped stream eventually. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink >(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372808#comment-16372808 ] Aljoscha Krettek commented on FLINK-8543: - [~snowch] The solution for this bug, though, is to remove the {{super.close()}} call, right? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om,
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372701#comment-16372701 ] Steve Loughran commented on FLINK-8543: --- copying a subset of the old file to the new file would seem to be the way. Not ideal as you've read everything in and write it back, plus with update inconsistency clients could still get the old file for a bit. Writting a .valid-length field is a better strategy when everything knows to check it. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372643#comment-16372643 ] Aljoscha Krettek commented on FLINK-8543: - The comments are correct, S3 simply does not support truncate. The reason why we need it is that we need to trim files in case of recovering from a failure to get them back to the state they had when we did the last checkpoint. To ensure exactly-once semantics. The solution we came up with for filesystems that don't support truncate is the {{.valid-length}} files because we simply cannot take back what was already written but you're right that the client reading those files needs to understand them. Coming up with a better solution will be one of the goals of the 1.6 release cycle, though. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372534#comment-16372534 ] chris snow commented on FLINK-8543: --- [~aljoscha] - what are your thoughts on this? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); > ProgressableProgressListener
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371870#comment-16371870 ] Steve Loughran commented on FLINK-8543: --- There's no truncate operation in the S3 protocol; so not in the s3a connector, nor in the azure or swift clients. Not fixable. Flush-after-close, that can be downgraded, but truncate isn't part of the S3 object model of immutable objects.. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371882#comment-16371882 ] chris snow commented on FLINK-8543: --- So all files being accompanied by .valid-length is expected behavoir when saving to s3? If so, unless client applications can understand and use the .valid-length (which I don’t think will be the case), I don’t think this functionality makes sense with s3? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371976#comment-16371976 ] Steve Loughran commented on FLINK-8543: --- bq. So all files being accompanied by .valid-length is expected behavoir when saving to s3? that's nothing to do with the s3a connector, so ask the flink team. One thing to always remember is: Object Stores are not Filesystems. Yes, they appear to support the same API, have that same metaphor of directories and files, but they are different, and if you try too hard, you will discover that things you expect aren't there. This may be one of them > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371855#comment-16371855 ] chris snow commented on FLINK-8543: --- Does Flink running on EMR using S3 have the same issue? If not, what AWS S3 API calls and filesystem implementation are used? IBM COS S3 supports a subset of the most common AWS S3 API operations (https://ibm-public-cos.github.io/crs-docs/api-reference#copy-an-object). > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371773#comment-16371773 ] Aljoscha Krettek commented on FLINK-8543: - Yes, this is what I would expect since the Hadoop S3A filesystem does not support truncate. Do you know if IBM COS would support truncate? Could you use an alternative {{FileSystem}} implementation? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371613#comment-16371613 ] chris snow commented on FLINK-8543: --- My apologies, the commented out close() didn't get deployed due to a compile error including my own version of the class so I removed the close with a bytecode modifier, i.e. *class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter {code} public void close() throws IOException { if (this.keyValueWriter != null) { this.keyValueWriter.close(); } } {code} It is running without exception now, however, every file in COS S3 still has the valid-length suffix: {code} ... transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 bytes21/02/2018 14:56:10 transactions_load_20180221/2018-02-21--1454/part-0-0 22.6 KB 21/02/2018 14:56:06 transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 bytes21/02/2018 14:56:05 transactions_load_20180221/2018-02-21--1455/part-0-0 14.5 KB 21/02/2018 14:56:03 ... {code} > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen()
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371570#comment-16371570 ] Aljoscha Krettek commented on FLINK-8543: - Regarding IBM COS S3: Yes, that sounds good. Would you also have the debug logs with stracktraces for the run without {{super.close()}}? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371485#comment-16371485 ] chris snow commented on FLINK-8543: --- Unfortunately, commenting out the call to super.close() doesn't stop the exception: {code} @Override public void close() throws IOException { // See https://issues.apache.org/jira/browse/FLINK-8543?focusedCommentId=16371445=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16371445 //super.close(); //the order is important since super.close flushes inside if (keyValueWriter != null) { keyValueWriter.close(); } {code} On the consistency note, hopefully, IBM COS S3 will be ok as it is immediately consistent? > COS is ‘immediately consistent’ for data and ‘eventually consistent’ for > usage accounting. Source: https://ibm-public-cos.github.io/crs-docs/faq > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371445#comment-16371445 ] Aljoscha Krettek commented on FLINK-8543: - Ok, this confirms my suspicion. The problem is this part of the code: https://github.com/apache/flink/blob/537a10ea2ff6a2d8507483c66f413f77884e77c4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java#L163: {{super.close()}} will close the underlying stream, {{keyValueWriter.close()}} will eventually call {{DataFileWriter.close()}} which will in turn call {{flush()}} on the underlying stream. You could try copying {{AvroKeyValueSinkWriter}} to your own code and changing the order of calls in {{AvroKeyValueSinkWriter.close()}}, or omitting {{super.close()}} altogether. On a side note, the {{BucketingSink}} does currently not work well with eventually consistent filesystems, such as S3, because it depends on directory listings being accurate for some operations. We are aware of that and want to fix this for Flink 1.6, though. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get())
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371405#comment-16371405 ] chris snow commented on FLINK-8543: --- [~aljoscha] The failures seem to be happening for all files. I'm running my code on cloud cluster that I provisioned just for this testing. I'm happy to share credentials if you want to take a more detailed look. My code is open source and the cluster will be destroyed after these tests, so there's nothing sensitive on it. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371403#comment-16371403 ] chris snow commented on FLINK-8543: --- Sorry for the delay. I've added the debug statements: {code:java} public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException { this.key = key; this.progress = progress; this.fs = fs; backupFile = fs.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); LOG.debug("OutputStream for key '{}' writing to tempfile: {}", key, backupFile); this.backupStream = new BufferedOutputStream( new FileOutputStream(backupFile)); } // ** print extra debug output ** void printStackTrace() { long threadId = Thread.currentThread().getId(); StringBuilder sb = new StringBuilder(); sb.append("Thread id: " + Thread.currentThread().getId() + " key: " + key); for (StackTraceElement ste : Thread.currentThread().getStackTrace()) { sb.append("\n " + ste); } // I'm being lazy - log the stacktrace as an error so it will get logged without having to // change the logger configuration LOG.error(sb.toString()); } /** * Check for the filesystem being open. * @throws IOException if the filesystem is closed. */ void checkOpen() throws IOException { if (closed.get()) { printStackTrace(); throw new IOException( "Output Stream closed. Thread id: " + Thread.currentThread().getId() + " key: " + key); } } @Override public void flush() throws IOException { checkOpen(); backupStream.flush(); } @Override public void close() throws IOException { printStackTrace(); if (closed.getAndSet(true)) { return; } backupStream.close(); LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key); try { final ObjectMetadata om = fs.newObjectMetadata(backupFile.length()); Upload upload = fs.putObject( fs.newPutObjectRequest( key, om, backupFile)); ProgressableProgressListener listener = new ProgressableProgressListener(fs, key, upload, progress); upload.addProgressListener(listener); upload.waitForUploadResult(); listener.uploadCompleted(); // This will delete unnecessary fake parent directories fs.finishedWrite(key); } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException(e.toString()) .initCause(e); } catch (AmazonClientException e) { throw translateException("saving output", key , e); } finally { if (!backupFile.delete()) { LOG.warn("Could not delete temporary s3a file: {}", backupFile); } super.close(); } LOG.debug("OutputStream for key '{}' upload complete", key); } @Override public void write(int b) throws IOException { checkOpen(); backupStream.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { checkOpen(); backupStream.write(b, off, len); } } {code} And here is one of the yarn logs: {code:java} Log Contents: 2018-02-21 12:43:11,323 INFO org.apache.flink.yarn.YarnTaskManagerRunner - 2018-02-21 12:43:11,326 INFO org.apache.flink.yarn.YarnTaskManagerRunner - Starting YARN TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 2018-02-21 12:43:11,326 INFO org.apache.flink.yarn.YarnTaskManagerRunner - OS current user: clsadmin 2018-02-21 12:43:12,162 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner - Current Hadoop/Kerberos user: clsadmin 2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.112-b15 2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner - Maximum heap size: 406 MiBytes 2018-02-21 12:43:12,264 INFO org.apache.flink.yarn.YarnTaskManagerRunner - JAVA_HOME: /usr/jdk64/jdk1.8.0_112 2018-02-21 12:43:12,266 INFO org.apache.flink.yarn.YarnTaskManagerRunner - Hadoop version: 2.7.3.2.6.2.0-205 ... omitted for brevity ... 2018-02-21 12:43:12,267 INFO org.apache.flink.yarn.YarnTaskManagerRunner - 2018-02-21 12:43:12,270 INFO
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370167#comment-16370167 ] Aljoscha Krettek commented on FLINK-8543: - [~snowch] Do you see these failures for all files or only sporadically? Also, do you maybe have updated logs with stack traces that show who calls {{close()}} and {{flush()}}? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370123#comment-16370123 ] chris snow commented on FLINK-8543: --- [~aljoscha] - I’m using AvroKeyValueSinkWriter [https://github.com/ibm-cloud-streaming-retail-demo/flink-on-iae-messagehub-to-s3/blob/master/src/main/java/com/ibm/cloud/flink/StreamingJob.java#L186] [~ste...@apache.org] - HDP 2.6.2: https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370094#comment-16370094 ] Aljoscha Krettek commented on FLINK-8543: - [~snowch] Btw, what is the writer you're using in this case? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370073#comment-16370073 ] Aljoscha Krettek commented on FLINK-8543: - [~ste...@apache.org] That would fix this issue but I think it would be better to fix this at the Flink level, and to figure out why {{flush()}} is called after {{close()}} in the first place. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365520#comment-16365520 ] Steve Loughran commented on FLINK-8543: --- Created HADOOP-15239 ; I'll take a patch with a new test method in org.apache.hadoop.fs.s3a.ITestS3ABlockOutputArray, least expensive being adding {{stream.flush()}} at the tail end of {{testBlocksClosed}}. Your chance to contribute to the hadoop codebase :) > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365506#comment-16365506 ] Steve Loughran commented on FLINK-8543: --- bq. I am running on a Hortonworks based hadoop environment which version? HDP-2.6's hadoop-aws module is essentially that of ASF Hadoop-2.8.0; if you make sure that fast output is enabled {{fs.s3a.fast.upload = true}} and the buffering => disk {{fs.s3a.fast.upload.buffer=disk}} to get the best upload perf. But that isn't going to do anything for flush(), which again does a state check when called. FWIW, the semantics of {{Flushable.flush()}} are pretty vague, including what it is meant to do (hence hadoop's hflush/hsync methods, which the S3A streams explcitly don't support). There is a valid case for saying "downgrade flush() on closed file to a warn + no-op". Moving to the fast output stream will make close() that much faster on big writes, as it will only be uploading the last block and any previous ones not yet uploaded; if the bandwidth to S3 > rate bytes are generated, it should just be the time to upload that last block: a few seconds. That may make a difference, though its still a bit of a race condition lurking. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364895#comment-16364895 ] chris snow commented on FLINK-8543: --- I didn’t see any errors or suspicious entries in the logs prior to this error. I’ll try running again in a few days with the extra stacktrace logging. Thanks, [~aljoscha] > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360480#comment-16360480 ] Aljoscha Krettek commented on FLINK-8543: - Also, I think it would help to have the stack trace at the logging statements that you added, i.e. add a {{ExceptionUtils.getStackTrace(new Exception())}} in there. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360477#comment-16360477 ] Aljoscha Krettek commented on FLINK-8543: - And was there a failure/restore cycle before that error popped up? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360470#comment-16360470 ] Aljoscha Krettek commented on FLINK-8543: - There is no other failure or anything suspicious in the logs, correct? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359852#comment-16359852 ] chris snow commented on FLINK-8543: --- I'm hoping that I can get access to an internal cluster that will give me root access and hence more debugging capabilities. I’m thinking of adding some code to print out the stacktrace and the thread ID from the flush() and close() methods. Are there any other areas that you would like me to investigate? > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356524#comment-16356524 ] chris snow commented on FLINK-8543: --- One thing to note - I've used the standard Flink 1.4.0 download for hadoop 2.7 but I am running on a Hortonworks based hadoop environment. I'm not sure what impact this may have, but I thought it was worth mentioning in case this may have an impact. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355643#comment-16355643 ] Aljoscha Krettek commented on FLINK-8543: - I thought this shouldn't be possible because the code that calls the timer callback uses the same lock as the block you posted and also checks whether the timer service is not quiesced: https://github.com/apache/flink/blob/d86c6b6bb32adee9d4b5c9098340a34e8a8a7f1d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L249 So after that synchronized block exits that block shouldn't call the callback anymore. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { >
[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355322#comment-16355322 ] Chesnay Schepler commented on FLINK-8543: - [~aljoscha][~kkl0u] Could it be that a timer can fire after the function was already closed? I've found this code in the StreamTask class: {code} synchronized (lock) { // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); // make sure no new timers can come timerService.quiesce(); // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; } // make sure all timers finish timerService.awaitPendingAfterQuiesce(); } > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * >Reporter: chris snow >Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vv-- Additional logging --vvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); >