[jira] [Commented] (FLINK-32241) UnsupportedFileSystemException when using the ABFS Hadoop driver for checkpointing in Flink 1.17

2023-06-01 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728393#comment-17728393
 ] 

Steve Loughran commented on FLINK-32241:


this is odd as the buffer to disk code is derived from what's been in the s3a 
connector for years and it never blew up.

on a host without the abfs change, can you grab the cloudstore module and run 
its storediag command to get a view of the world, including probes of temp dirs 
... look for the output under "Output Buffering" to see

https://github.com/steveloughran/cloudstore


> UnsupportedFileSystemException when using the ABFS Hadoop driver for 
> checkpointing in Flink 1.17
> 
>
> Key: FLINK-32241
> URL: https://issues.apache.org/jira/browse/FLINK-32241
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.17.1
> Environment: Flink 1.17.1
> Hadoop 3.3.4
> Flink Operator 1.4.0
> Kubernetes 1.24
>Reporter: Anton Ippolitov
>Priority: Minor
>
> https://issues.apache.org/jira/browse/HADOOP-18707 introduced a new 
> functionality in the ABFS Hadoop client which buffers data on local disk by 
> default.
> It looks like this breaks with Flink 1.17 in a scenario where:
>  * ABFS is used for checkpointing
>  * JobManager HA is enabled
>  * First JobManager leader dies and a stand-by JobManager takes over
> I can reliably reproduce this with Flink 1.17.1 running on Kubernetes by 
> simply killing the JM leader pod. Once the stand-by JobManager takes over, 
> all checkpoints consistently fail with the following error:
> {noformat}
> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize 
> checkpoint. at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1424)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1310)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1202)
>  at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme 
> "file" at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443) at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) at 
> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) at 
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) at 
> org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) at 
> org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496) at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>  at 
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
>  at 
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
>  at 
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
>  at 
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.(AbfsOutputStream.java:173)
>  at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
>  at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064) at 
> org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:154)
>  at 
> org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
>  at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>  at 
> 

[jira] [Commented] (FLINK-30450) FileSystem supports exporting client-side metrics

2022-12-20 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17649716#comment-17649716
 ] 

Steve Loughran commented on FLINK-30450:


if you can grab the IOStatistics from an iOStatisticsSource implementation, 
s3a, gcs and abfs FS instances collect detailed stats you can snapshot and then 
marshall as json/serialized java objects. Also works for other classes (input 
streams, output streams, list iterators). that stuff is there to play with.

otherwise, hadoop-3.3.2+ optionally annotates the http-referer headr on all S3 
requests so you can apportion blame from the S3 server logs, including looking 
for 503 throttle events. Avoids code changes up the stack and gives a view of 
the entire cluster.

> FileSystem supports exporting client-side metrics
> -
>
> Key: FLINK-30450
> URL: https://issues.apache.org/jira/browse/FLINK-30450
> Project: Flink
>  Issue Type: New Feature
>  Components: FileSystems
>Reporter: Hangxiang Yu
>Priority: Major
>
> Client-side metrics, or job level metrics for filesystem could help us to 
> monitor filesystem more precisely.
> Some metrics (like request rate , throughput, latency, retry count, etc) are 
> useful to monitor the network or client problem of checkpointing or other 
> access cases for a job.  
> Some filesystems like s3, s3-presto, gs have supported enabling some metrics, 
> these could be exported in the filesystem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26563) HadoopS3RecoverableWriterITCase hang on azure

2022-03-10 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504275#comment-17504275
 ] 

Steve Loughran commented on FLINK-26563:


you could tune the s3a retry params to try to recover less...

> HadoopS3RecoverableWriterITCase hang on azure
> -
>
> Key: FLINK-26563
> URL: https://issues.apache.org/jira/browse/FLINK-26563
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Affects Versions: 1.15.0
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> 2022-03-09T09:44:11.6454998Z Mar 09 09:44:11 "main" #1 prio=5 os_prio=0 
> tid=0x7f331000b800 nid=0x7601 runnable [0x7f3318203000]
> 2022-03-09T09:44:11.6455475Z Mar 09 09:44:11java.lang.Thread.State: 
> RUNNABLE
> 2022-03-09T09:44:11.6455962Z Mar 09 09:44:11  at 
> java.net.SocketInputStream.socketRead0(Native Method)
> 2022-03-09T09:44:11.6456563Z Mar 09 09:44:11  at 
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> 2022-03-09T09:44:11.6457422Z Mar 09 09:44:11  at 
> java.net.SocketInputStream.read(SocketInputStream.java:171)
> 2022-03-09T09:44:11.6458036Z Mar 09 09:44:11  at 
> java.net.SocketInputStream.read(SocketInputStream.java:141)
> 2022-03-09T09:44:11.6458667Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:457)
> 2022-03-09T09:44:11.6459649Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:237)
> 2022-03-09T09:44:11.6460672Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190)
> 2022-03-09T09:44:11.6461267Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLTransport.decode(SSLTransport.java:109)
> 2022-03-09T09:44:11.6462110Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1386)
> 2022-03-09T09:44:11.6463039Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354)
> 2022-03-09T09:44:11.6464168Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
> 2022-03-09T09:44:11.6465097Z Mar 09 09:44:11  at 
> sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:948)
> 2022-03-09T09:44:11.6466100Z Mar 09 09:44:11  at 
> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
> 2022-03-09T09:44:11.6467149Z Mar 09 09:44:11  at 
> org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:197)
> 2022-03-09T09:44:11.6468145Z Mar 09 09:44:11  at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
> 2022-03-09T09:44:11.6469179Z Mar 09 09:44:11  at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
> 2022-03-09T09:44:11.6470153Z Mar 09 09:44:11  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
> 2022-03-09T09:44:11.6471929Z Mar 09 09:44:11  at 
> com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
> 2022-03-09T09:44:11.6472974Z Mar 09 09:44:11  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
> 2022-03-09T09:44:11.6474107Z Mar 09 09:44:11  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
> 2022-03-09T09:44:11.6474724Z Mar 09 09:44:11  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
> 2022-03-09T09:44:11.6475308Z Mar 09 09:44:11  at 
> com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
> 2022-03-09T09:44:11.6475917Z Mar 09 09:44:11  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
> 2022-03-09T09:44:11.6476713Z Mar 09 09:44:11  at 
> com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107)
> 2022-03-09T09:44:11.6477327Z Mar 09 09:44:11  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
> 2022-03-09T09:44:11.6478193Z Mar 09 09:44:11  at 
> com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
> 2022-03-09T09:44:11.6478853Z Mar 09 09:44:11  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
> 2022-03-09T09:44:11.6479480Z Mar 09 09:44:11  at 
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$read$3(S3AInputStream.java:468)
> 2022-03-09T09:44:11.6480092Z Mar 09 09:44:11  at 
> org.apache.hadoop.fs.s3a.S3AInputStream$$Lambda$312/577682023.execute(Unknown 
> Source)
> 2022-03-09T09:44:11.6480876Z Mar 09 09:44:11  at 
> org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> 2022-03-09T09:44:11.6481443Z Mar 09 09:44:11  at 
> org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
> 

[jira] [Commented] (FLINK-23722) S3 Tests fail on AZP: Unable to find a region via the region provider chain. Must provide an explicit region in the builder or setup environment to supply a region.

2021-08-20 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402184#comment-17402184
 ] 

Steve Loughran commented on FLINK-23722:


FWIW, you can fix this by setting fs.s3a.endpoint = s3.amazonaws.com if the 
property is unset/empty ; doesn't surface in EC2 or any developer with the AWS 
CLI tools set up (which is why nobody noticed)

> S3 Tests fail on AZP: Unable to find a region via the region provider chain. 
> Must provide an explicit region in the builder or setup environment to supply 
> a region.
> 
>
> Key: FLINK-23722
> URL: https://issues.apache.org/jira/browse/FLINK-23722
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.14.0
>Reporter: Arvid Heise
>Assignee: Arvid Heise
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> E2E and integration tests fail with
> {noformat}
> Aug 11 09:11:32 Caused by: com.amazonaws.SdkClientException: Unable to find a 
> region via the region provider chain. Must provide an explicit region in the 
> builder or setup environment to supply a region.
> Aug 11 09:11:32   at 
> com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:462)
> Aug 11 09:11:32   at 
> com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
> Aug 11 09:11:32   at 
> com.amazonaws.client.builder.AwsSyncClientBuilder.build(AwsSyncClientBuilder.java:46)
> Aug 11 09:11:32   at 
> org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.buildAmazonS3Client(DefaultS3ClientFactory.java:144)
> Aug 11 09:11:32   at 
> org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:96)
> Aug 11 09:11:32   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:753)
> Aug 11 09:11:32   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:446)
> Aug 11 09:11:32   ... 44 more
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21884=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19589) Expose S3 options for tagging and object lifecycle policy for FileSystem

2021-01-06 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17259767#comment-17259767
 ] 

Steve Loughran commented on FLINK-19589:


the filesystem createFile(Path path) API Returns a builder which allows you to 
set arbitrary (key, string) opts, any FS could add its own

createFile("/output.txt").opt("fs.s3a.tags.owner", 
"stevel").opt("fs.s3a.tags.project", "dev").build();

if you want a standard prefix for tags to be implemented across object stores, 
it'd be welcomed for the s3a and abfs connectors.


> Expose S3 options for tagging and object lifecycle policy for FileSystem
> 
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Padarn Wilson
>Priority: Minor
>
> This ticket proposes to expose the management of two properties related S3 
> Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19595) Flink SQL support S3 select

2020-12-04 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17244004#comment-17244004
 ] 

Steve Loughran commented on FLINK-19595:


s3a connector supports s3 select (HADOOP-15229) : 
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/s3_select.html

Does the tencent API work with this? Is there tuning needed?

FWIW it's very hard to use in code which tries to split files, or which assumes 
the length of the stream == length of the file. And with the results coming 
back in text as either CSV or JSON, it's inefficient about processing the 
results. If things came back as avro records life would be easier.

Also: is tencent's S3 consistent? That is: if S3Guard "went away" would 
everything still work?

> Flink SQL support S3 select
> ---
>
> Key: FLINK-19595
> URL: https://issues.apache.org/jira/browse/FLINK-19595
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems, Table SQL / Ecosystem
>Reporter: liuxiaolong
>Priority: Major
> Attachments: image-2020-11-02-18-08-11-461.png, 
> image-2020-11-02-18-18-14-961.png
>
>
> h4. Summarize
> Flink is based on S3AInputStream.java to select datas stored in Tencent COS, 
> it will call the getObject function of AmazonS3Client.java. 
> Now, Tencent COS  have already support to pushdown the CSV and Parquert file 
> format.
> In these cases, using getObject to select datas will wastes a lots of 
> bandwidth.
> So, I think Flink SQL should support S3 Select, to reduce the waste of 
> bandwidth.
>  
> h4. Design
> 1. In HiveMapredSplitReader.java , we used int[] selectedFields to construct 
> S3 SELECT SQL. And we created a new Class named S3SelectCsvReader which used 
> AmazonS3Client.selectObjectContent function to readLine CSV File.
> !image-2020-11-02-18-08-11-461.png|width=535,height=967!
>  
> !image-2020-11-02-18-18-14-961.png|width=629,height=284!
>  
> 2.  Flink Demo Table:
> 1) Table schema
> Flink SQL> desc cos.test_s3a;
>  root
> |– name: STRING (col1)|
> |– age: INT           (col2)|
> |– dt: STRING      (col3,it's a partition column)|
>  
> 2) Conversion relationship (FLINK SQL Convert To S3 SELECT SQL)
> FlinkSQL                                                                      
>                         S3 SELECT SQL
> select name from cos.test_s3a;                                             => 
>       SELECT s._1, null FROM S3Object s
> select age from cos.test_s3a;                                                 
> =>      SELECT null, s._2 FROM S3Object s
> select dt, name, age from cos.test_s3a;                                =>     
>   SELECT s._1, s._2 FROM S3Object s
> select dt from cos.test_s3a;                                                  
>   =>      SELECT null, null FROM S3Object s
> select * from cos.test_s3a;                                                   
>    =>      SELECT s._1, s._2 FROM S3Object s
> select name from cos.test_s3a where dt='2020-07-15';      =>      SELECT 
> s._1, null FROM S3Object s
>  
> 3) Patch Commit
> https://github.com/Coderlxl/flink/commit/b211f4830a7301bf9283a6d37209000b176913ad



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19550) StreamingFileSink can't be interrupted when writing to S3

2020-12-04 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243998#comment-17243998
 ] 

Steve Loughran commented on FLINK-19550:


s3a in 3.2.2 and 3.3.1 will ship with 1.11.901, see HADOOP-17343.

FWIW, we didn't see any regressions in test runs nor did we have any API 
changes to worry about. 

> StreamingFileSink can't be interrupted when writing to S3
> -
>
> Key: FLINK-19550
> URL: https://issues.apache.org/jira/browse/FLINK-19550
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.12.0
>Reporter: Roman Khachatryan
>Priority: Minor
>
> Reported in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Network-issue-leading-to-quot-No-pooled-slot-available-quot-td38553.html
>  :
> StreamingFileSink uses hadoop S3AFileSystem for s3a, which in turn uses 
> amazon aws sdk.
> The version of amazon aws sdk used doesn't respect interrupts. However, a 
> newer Amazon 1.11.878 SDK version does.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15814) Log warning when StreamingFileSink is used with an ambiguous S3 scheme and error when used with s3p

2020-02-26 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045839#comment-17045839
 ] 

Steve Loughran commented on FLINK-15814:


Out of curiosity, what is the problem?

> Log warning when StreamingFileSink is used with an ambiguous S3 scheme and 
> error when used with s3p
> ---
>
> Key: FLINK-15814
> URL: https://issues.apache.org/jira/browse/FLINK-15814
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> StreamingFileSink does not properly work with bulk output formats and 
> s3-presto. We should log a warning when the provided scheme is 's3p' or 's3' 
> (which is ambiguous) explaining the potential issues and encourage users to 
> only use 's3a' which always delegates to hadoop. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13602) S3 filesystems effectively do not support credential providers

2019-12-13 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995629#comment-16995629
 ] 

Steve Loughran commented on FLINK-13602:


not much we can do in S3A to help here. Sorry. 

Could you add a provider for presto which takes a hadoop config as a 
constructor but underneath instantiates its own set of credential providers? 

> S3 filesystems effectively do not support credential providers
> --
>
> Key: FLINK-13602
> URL: https://issues.apache.org/jira/browse/FLINK-13602
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> To provide credentials to S3 users may configure a credentials provider. For 
> providers from amazon (which are relocated) we allow users to configure the 
> original class name, and relocate it manually in the S3 filesystem factories.
> However, none of the amazon provided credential providers can be used with 
> the Presto filesystem, since it _additionally_ requires them to have a 
> constructor accepting a hadoop configuration. 
> (https://prestodb.github.io/docs/current/connector/hive.html#amazon-s3-configuration)
> {{hadoop-aws}} _does_ include a number of credential providers that have this 
> constructor, however these use configuration keys that aren't mirrored from 
> the flink config. (they expect {{fs.s3a}} as a key-prefix), not to mention 
> that users would have to configure the relocated class (since the S3 factory 
> only manually relocates amazon classes).
> Finally, a custom implementation of the credentials provider can effectively 
> be ruled out since they too would have to be implemented against the 
> relocated amazon/hadoop classes, which we can't really expect users to do.
> In summary, amazon providers aren't working since they don't have a 
> constructor that presto requires, hadoop providers don't work since we don't 
> mirror the required configuration keys, and custom providers are unreasonable 
> as they'd have to be implemented against relocated classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3

2019-04-16 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819326#comment-16819326
 ] 

Steve Loughran commented on FLINK-8801:
---

* s3a in hadoop 3.1+ lets you see etags as the file checksum
* ongoing work on s3a support for tracking/using version ids

> S3's eventual consistent read-after-write may fail yarn deployment of 
> resources to S3
> -
>
> Key: FLINK-8801
> URL: https://issues.apache.org/jira/browse/FLINK-8801
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, FileSystems, Runtime / Coordination
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.3, 1.5.0
>
>
> According to 
> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:
> {quote}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.
> {quote}
> Some S3 file system implementations may actually execute such a request for 
> the about-to-write object and thus the read-after-write is only eventually 
> consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently 
> relies on a consistent read-after-write since it accesses the remote resource 
> to get file size and modification timestamp. Since there we have access to 
> the local resource, we can use the data from there instead and circumvent the 
> problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3

2019-04-16 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819324#comment-16819324
 ] 

Steve Loughran commented on FLINK-8801:
---

we cant set times against objects as they are fixed on object upload

> S3's eventual consistent read-after-write may fail yarn deployment of 
> resources to S3
> -
>
> Key: FLINK-8801
> URL: https://issues.apache.org/jira/browse/FLINK-8801
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, FileSystems, Runtime / Coordination
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.3, 1.5.0
>
>
> According to 
> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:
> {quote}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.
> {quote}
> Some S3 file system implementations may actually execute such a request for 
> the about-to-write object and thus the read-after-write is only eventually 
> consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently 
> relies on a consistent read-after-write since it accesses the remote resource 
> to get file size and modification timestamp. Since there we have access to 
> the local resource, we can use the data from there instead and circumvent the 
> problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues

2019-01-09 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16738131#comment-16738131
 ] 

Steve Loughran commented on FLINK-11187:


bq. As far as I can tell, this is just a transient s3 error, so it appears to 
be pretty rare but does happen more often with increased load.

its believed to be a network error: upload of a single block fails, transfer 
manager wants to retry but the block buffering is such that it can't repost 
that block.

AFAIK. no easy way to test this other than a long-haul upload over many 
hours.Not unless you can insert an unreliable proxy or sneak in an httpclient 
which fails (possible if you have your own modified apache httpclient, run the 
SDK unshaded, etc...)

BTW: do make sure that final POST does retries too; some of the AWS SDKs have 
under-retried there, and you don't want a 4h upload to fail at the final step.

> StreamingFileSink with S3 backend transient socket timeout issues 
> --
>
> Key: FLINK-11187
> URL: https://issues.apache.org/jira/browse/FLINK-11187
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Streaming Connectors
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Addison Higham
>Assignee: Addison Higham
>Priority: Major
> Fix For: 1.7.2, 1.8.0
>
>
> When using the StreamingFileSink with S3A backend, occasionally, errors like 
> this will occur:
> {noformat}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  Your socket connection to the server was not read from or written to within 
> the timeout period. Idle connections will be closed. (Service: Amazon S3; 
> Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
> Request ID: xxx, S3 Extended Request ID: xxx
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
> This causes a restart of flink job, which is often able to recover from, but 
> under heavy load, this can become very frequent.
>  
> Turning on debug logs you can find the following relevant stack trace:
> {noformat}
> 2018-12-17 05:55:46,546 DEBUG 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
> failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at 
> 

[jira] [Commented] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues

2018-12-20 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725942#comment-16725942
 ] 

Steve Loughran commented on FLINK-11187:


There's limits to how well you can buffer...if the post fails it goes back to 
byte 0 and retries, which is done with a reset() to the marker set when 
uploading that part began.

Better to serve up directly from a file, if that's the source, or, if its 
memory, having an input stream which [supports mark/reset to anywhere in the 
entire 
block|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java#L640]

> StreamingFileSink with S3 backend transient socket timeout issues 
> --
>
> Key: FLINK-11187
> URL: https://issues.apache.org/jira/browse/FLINK-11187
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Streaming Connectors
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Addison Higham
>Assignee: Addison Higham
>Priority: Major
> Fix For: 1.7.2
>
>
> When using the StreamingFileSink with S3A backend, occasionally, errors like 
> this will occur:
> {noformat}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  Your socket connection to the server was not read from or written to within 
> the timeout period. Idle connections will be closed. (Service: Amazon S3; 
> Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
> Request ID: xxx, S3 Extended Request ID: xxx
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
> This causes a restart of flink job, which is often able to recover from, but 
> under heavy load, this can become very frequent.
>  
> Turning on debug logs you can find the following relevant stack trace:
> {noformat}
> 2018-12-17 05:55:46,546 DEBUG 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
> failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at 
> 

[jira] [Commented] (FLINK-10817) Upgrade presto dependency to support path-style access

2018-11-08 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16679607#comment-16679607
 ] 

Steve Loughran commented on FLINK-10817:


If you are working with non-AWS endpoints, you also need to make sure that the 
options to switch to http and to revert to the original s3 signature mech get 
over too. 

> Upgrade presto dependency to support path-style access
> --
>
> Key: FLINK-10817
> URL: https://issues.apache.org/jira/browse/FLINK-10817
> Project: Flink
>  Issue Type: Improvement
>Reporter: Adam Lamar
>Priority: Major
>
> In order to use any given non-AWS s3 implementation backed by the presto s3 
> filesystem, it is necessary to set at least one configuration parameter in 
> flink-conf.yaml:
>  * presto.s3.endpoint: https://example.com
> This appears to work as expected for hosted s3 alternatives.
> In order to use a bring-your-own, self-hosted s3 alternative like 
> [minio|https://www.minio.io/], at least two configuration parameters are 
> required:
>  * presto.s3.endpoint: https://example.com
>  * presto.s3.path-style-access: true
> However, the second path-style-access parameter doesn't work because the 
> 0.185 version of presto doesn't support passing through that configuration 
> option to the hive s3 client.
> To work around the issue, path-style-access can be forced on the s3 client by 
> using an IP address for the endpoint (instead of a hostname). Without this 
> workaround, flink attempts to use the virtualhost-style at 
> bucketname.example.com, which fails unless the expected DNS records exist.
> To solve this problem and enable non-IP endpoints, upgrade the 
> [pom|https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  to at least 0.186 which includes [this 
> commit|https://github.com/prestodb/presto/commit/0707f2f21a96d2fd30953fb3fa9a9a03f03d88bd]
>  Note that 0.213 is the latest presto release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10363) S3 FileSystem factory prints secrets into logs

2018-09-21 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623330#comment-16623330
 ] 

Steve Loughran commented on FLINK-10363:


see WHIRR-642 for this same issue; it's easy to do. For that one I had to 
google for every whirr log entry & notify at least two people they'd 
accidentally shared their secrets. Luckily that was the era before bitcoin 
miners scanned the internet for AWS keys

> S3 FileSystem factory prints secrets into logs
> --
>
> Key: FLINK-10363
> URL: https://issues.apache.org/jira/browse/FLINK-10363
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.7.0, 1.6.2
>
>
> The file system factory logs all values it applies from the flink 
> configuration.
> That frequently includes access keys, which should not leak into logs.
> The loader should only log the keys, not the values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10363) S3 FileSystem factory prints secrets into logs

2018-09-20 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622263#comment-16622263
 ] 

Steve Loughran commented on FLINK-10363:


Stephan: we went to a lot of effort to not log AWS secrets in the S3A code. 
Tell me you haven't been printing them.

FWIW, all the sensitive values are listed here: 
https://github.com/steveloughran/cloudstore/blob/master/src/main/java/org/apache/hadoop/fs/store/diag/S3ADiagnosticsInfo.java#L40

if anyone puts user:pass in the URL then even the path becomes sensitive, which 
is why users are told off for doing that, and why the feature has finally been 
turned off.

> S3 FileSystem factory prints secrets into logs
> --
>
> Key: FLINK-10363
> URL: https://issues.apache.org/jira/browse/FLINK-10363
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.7.0, 1.6.2
>
>
> The file system factory logs all values it applies from the flink 
> configuration.
> That frequently includes access keys, which should not leak into logs.
> The loader should only log the keys, not the values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-07-23 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553596#comment-16553596
 ] 

Steve Loughran commented on FLINK-9061:
---

It'd still be good to conduct some experiments here about trying to overload a 
single shard. if you are doing random ORC/Parquet IO, the S3A connector can do 
multiple GETs within the same input stream, so can generate a higher load than 
you'd think. What it does mean is that it probably now takes active effort to 
overload bits of the system, instead of happening by default (see HADOOP-15209 
for an example)

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9525) Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module

2018-06-05 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502161#comment-16502161
 ] 

Steve Loughran commented on FLINK-9525:
---

we're actually moving Hadoop off that introspection mechanism for dynamic class 
loading (if someone can do it), because its so expensive...it force loads all 
the classes under the FS impl even if you aren't using the class. I think I'd 
like some lighter weight service declaration class which only contained some 
info fields (real implementation class, URL of provider), so was low cost to 
load.

> Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module
> --
>
> Key: FLINK-9525
> URL: https://issues.apache.org/jira/browse/FLINK-9525
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
> Attachments: wx20180605-142...@2x.png
>
>
> if flink job dependencies includes `hadoop-common` and `hadoop-hdfs`, will 
> throw runtime error.
> like this case: 
> [https://stackoverflow.com/questions/47890596/java-util-serviceconfigurationerror-org-apache-hadoop-fs-filesystem-provider-o].
> the root cause: 
> see  {{org.apache.flink.core.fs.FileSystem}}
> This class will load all available file system factories via 
> {{ServiceLoader.load(FileSystemFactory.class)}}. 
> Since  {{ META-INF / services / org.apache.flink.core.fs.FileSystemFactory }} 
> file in the classpath does not have an 
> `org.apache.flink.runtime.fs.hdfs.HadoopFsFactory`, 
> and finaly only loaded one available {{LocalFileSystemFactory}} .
> more error messages see this screenshot.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-04-02 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422645#comment-16422645
 ] 

Steve Loughran commented on FLINK-9061:
---

something less than 8, maybe 5, though it's mostly all superstition

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417964#comment-16417964
 ] 

Steve Loughran commented on FLINK-9061:
---

[~greghogan]

I cut the link as it was just a duplicate of the one in the header.

your link is new; something I think I'd seen somewhere else too. It's 
unfortunate that most of our knowledge here is superstition and stack traces, 
but I think that's a deliberate attempt to avoid making any commitment about 
future behaviour. 

 

Here's my beliefs

#.Once you write enough data down a path, a partition is somehow triggered, and 
data split across s3 shards
# that partitioning event is counted as part of your load for the bucket/the IO 
which a partitioning path can sustain is reduced
# so the overall IO rate at that point drops. Maybe that's raising the 500 error

I don't think it partitions based purely on load. That would be fun given you 
can just issue many delete requests to a path and get throttled.


> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417145#comment-16417145
 ] 

Steve Loughran edited comment on FLINK-9061 at 3/28/18 10:33 AM:
-

The s3a connector will have the same issue, though there we can change the 
backoff policy, so you shouldn't see it other than an increase in the throttle 
event metrics. But checkpointing will slow down

Putting the most randomness at the head of the path is the one that s3 likes 
best, as its partitioning is based on the first few characters. And I mean 
"really at the head of the path" : 




was (Author: ste...@apache.org):
The s3a connector will have the same issue, though there we can change the 
backoff policy, so you shouldn't see it other than an increase in the throttle 
event metrics. But checkpointing will slow down

Putting the most randomness at the head of the path is the one that s3 likes 
best, as its partitioning is based on the first few characters. And I mean 
"really at the head of the path" : 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/



> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-28 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417145#comment-16417145
 ] 

Steve Loughran commented on FLINK-9061:
---

The s3a connector will have the same issue, though there we can change the 
backoff policy, so you shouldn't see it other than an increase in the throttle 
event metrics. But checkpointing will slow down

Putting the most randomness at the head of the path is the one that s3 likes 
best, as its partitioning is based on the first few characters. And I mean 
"really at the head of the path" : 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/



> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-28 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417140#comment-16417140
 ] 

Steve Loughran commented on FLINK-8794:
---

bq. Enable consistent-view can cause other problems.

really? what are they?

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415501#comment-16415501
 ] 

Steve Loughran commented on FLINK-9061:
---

[~StephanEwen]: I knew that, but it's still the same AWS SDK underneath.

500 is not normal throttling; that's 503.

[~jgrier]: What you are seeing here is something has gone wrong in S3 itself. 
Usually transient, treat as retriable on all requests.

S3A on Hadoop 3.1+ will treat as a connectivity error and use whatever settings 
you use there (retryUpToMaximumCountWithFixedSleep policy). If a 500 can be 
caused by overload, that could/should be switched to the exponential backoff 
policy as per 503 events.
 # file a support request with the AWS team including the request ID of a 
failing request
 # add a followup here listing what they said/recommended

Obviously, I can't fix the stack trace here, but we can at least change the S3A 
connector to see this and retry appropriately.

Thank you for finding another interesting failure mode of S3 itself :)

+[~fabbri]

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415491#comment-16415491
 ] 

Steve Loughran edited comment on FLINK-8794 at 3/27/18 12:06 PM:
-

That's amazon EMR's problem. Switch to their "consistent s3" offering for the 
bucket you are using as the sink

Link: 
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html

if you open any issue with the EMR team, and it is just a read-after-write list 
inconsistency, they'll just point at that


was (Author: ste...@apache.org):
That's amazon EMR's problem. Switch to their "consistent s3" offering for the 
bucket you are using as the sink

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415491#comment-16415491
 ] 

Steve Loughran commented on FLINK-8794:
---

That's amazon EMR's problem. Switch to their "consistent s3" offering for the 
bucket you are using as the sink

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-26 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414280#comment-16414280
 ] 

Steve Loughran commented on FLINK-9061:
---

you can get it on delete requests too, if you try hard.

Jamie, can I see a full stack trace? I'm curious now as generally the s3 
clients should be retrying it, you'll see a slowdown rather than failure.

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-26 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414272#comment-16414272
 ] 

Steve Loughran commented on FLINK-8794:
---

{quote} writing to local disks would decrease performance, since you would need 
to write the same data twice (first locally then copy remotely
{quote}
 I don't know what FS connector you are using, but these days S3A defaults to 
buffering blocks to local HDD before initiating upload in the close() or after 
the block size threshold is reached. You aren't going to see a perf hit if you 
are writing files smaller than fs.s3a.blocksize. If bigger, afraid so, but it 
may be worth it. The staging S3A committers coming in Hadoop 3.1 postpone all 
uploads until task commit, but they gain better failure semantics and job 
commit is fast and trivial

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-26 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414262#comment-16414262
 ] 

Steve Loughran commented on FLINK-8794:
---

it does if you turn s3guard on with Hadoop 3.0+ and its S3A connector, as it 
(like amazon's EMRFS) uses dynamodb for that consistency.

Unless you write code on the explicit assumption that the store is eventually 
consistent, treating S3 "just" like a filesystem is dangerous. It'll usually 
work most of the time in tests, but at larger scale production deployments you 
can get burned.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2018-03-12 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395479#comment-16395479
 ] 

Steve Loughran commented on FLINK-7589:
---

We've been thinking of cutting a shaded version of the hadoop-cloud module, 
bonded to the already-shaded AWS SDK (i.e. it will still be brittle to AWS SDK 
changes. If you want to contribute that to Hadoop 3.2+ it could be backported 
to the rest of the 3.x release line. At the very least, you'd get to learn what 
it took to isolate things.

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.4
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis

2018-03-09 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392743#comment-16392743
 ] 

Steve Loughran commented on FLINK-:
---

If you are pulling in the shaded SDK, note that it's been declaring its netty 
dependencies unshaded: HADOOP-15264

> Upgrade AWS SDK in flink-connector-kinesis
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of 
> this version upgrade for KCL and KPL versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-22 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372701#comment-16372701
 ] 

Steve Loughran commented on FLINK-8543:
---

copying a subset of the old file to the new file would seem to be the way. Not 
ideal as you've read everything in and write it back, plus with update 
inconsistency clients could still get the old file for a bit. Writting a 
.valid-length field is a better strategy when everything knows to check it.

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
>   

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371870#comment-16371870
 ] 

Steve Loughran commented on FLINK-8543:
---

There's no truncate operation in the S3 protocol; so not in the s3a connector, 
nor in the azure or swift clients. Not fixable. Flush-after-close, that can be 
downgraded, but truncate isn't part of the S3 object model of immutable 
objects.. 

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> 

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371976#comment-16371976
 ] 

Steve Loughran commented on FLINK-8543:
---

bq. So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

that's nothing to do with the s3a connector, so ask the flink team. 

One thing to always remember is: Object Stores are not Filesystems.

Yes, they appear to support the same API, have that same metaphor of 
directories and files, but they are different, and if you try too hard, you 
will discover that things you expect aren't there. This may be one of them

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
>   

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-15 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365520#comment-16365520
 ] 

Steve Loughran commented on FLINK-8543:
---

Created HADOOP-15239 ; I'll take a patch with a new test method in 
org.apache.hadoop.fs.s3a.ITestS3ABlockOutputArray, least expensive being adding 
{{stream.flush()}} at the tail end of {{testBlocksClosed}}. Your chance to 
contribute to the hadoop codebase :)

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om 

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-15 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365506#comment-16365506
 ] 

Steve Loughran commented on FLINK-8543:
---

bq. I am running on a Hortonworks based hadoop environment

which version? HDP-2.6's hadoop-aws module is essentially that of ASF 
Hadoop-2.8.0; 
if you make sure that fast output is enabled {{fs.s3a.fast.upload = true}} and 
the buffering => disk {{fs.s3a.fast.upload.buffer=disk}} to get the best upload 
perf. But that isn't going to do anything for flush(), which again does a state 
check when called.

FWIW, the semantics of {{Flushable.flush()}} are pretty vague, including what 
it is meant to do (hence hadoop's hflush/hsync methods, which the S3A streams 
explcitly don't support). There is a valid case for saying "downgrade flush() 
on closed file to a warn + no-op".

Moving to the fast output stream will make close() that much faster on big 
writes, as it will only be uploading the last block and any previous ones not 
yet uploaded; if the bandwidth to S3 > rate bytes are generated, it should just 
be the time to upload that last block: a few seconds. That may make a 
difference, though its still a bit of a race condition lurking. 

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new 

[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2017-09-14 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166527#comment-16166527
 ] 

Steve Loughran commented on FLINK-7589:
---

bq.  is there any plan to make Flink use AWS S3 SDK directly rather than 
Hadoop's S3 implementation?

the topic has surface. Note that if you do want a client which is resilient to 
transitive failures, that's a fair amount of extra work, and you have to 
implement the fault injecting wrapper on the AWS SDK to actually verify that it 
works on an functional test suite...that's something to consider shipping so 
that people can use it in their own integration tests.

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2017-09-07 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156763#comment-16156763
 ] 

Steve Loughran commented on FLINK-7589:
---

well, you are allowed to file bug reports. 

However, it's not the s3 client getting GC'd, because the s3 client is retained 
for the lifespan of the FileSystem instance, so unless you are disposing of 
that, its retained.

I'd blame network connectivity: the connection was closed, you got back less 
data than you asked for. 

The s3a client does a single retry here, but it could be more sophisticated 
(HADOOP-14531). 

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-09-06 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155245#comment-16155245
 ] 

Steve Loughran commented on FLINK-7266:
---

if you are using s3a then delete(path, recursive=false) will stop you from 
trying to delete a non-empty dir

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.4.0, 1.3.2
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-09-05 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153910#comment-16153910
 ] 

Steve Loughran commented on FLINK-7266:
---

FWIW, in s3a we create a single delete request to rm all parent paths *and 
don't bother doing the existence check*. 

That is, for a file a/b/c.txt, after the file is written in close(), POST a 
delete list of

/a/
/a/b

It's ~O(1)  for depth and as you don't need to wait for the response, even 
something you could being async on.

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.4.0, 1.3.2
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7365) excessive warning logs of attempt to override final parameter: fs.s3.buffer.dir

2017-08-09 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119873#comment-16119873
 ] 

Steve Loughran commented on FLINK-7365:
---

There's a special log you can turn off to slience all deprecation warnings: 
{{org.apache.hadoop.conf.Configuration.deprecation}}

{code}
log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=ERROR
{code}

> excessive warning logs of attempt to override final parameter: 
> fs.s3.buffer.dir
> ---
>
> Key: FLINK-7365
> URL: https://issues.apache.org/jira/browse/FLINK-7365
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>
> I'm seeing hundreds of line of the following log in my JobManager log file:
> {code:java}
> 2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,485 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,486 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,626 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring
> ..
> {code}
> Info of my Flink cluster:
> - Running on EMR with emr-5.6.0
> - Using FSStateBackend, writing checkpointing data files to s3
> - Configured s3 with S3AFileSystem according to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem
> - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a  tag on 
> this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp'
> Here's my core-site.xml file:
> {code:java}
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   fs.s3.buffer.dir
>   /mnt/s3,/mnt1/s3
>   true
> 
> 
>   fs.s3.impl
>   org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
>   fs.s3n.impl
>   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> 
>   
> ipc.client.connect.max.retries.on.timeouts
> 5
>   
>   
> hadoop.security.key.default.bitlength
> 256
>   
>   
> hadoop.proxyuser.hadoop.groups
> *
>   
>   
> hadoop.tmp.dir
> /mnt/var/lib/hadoop/tmp
>   
>   
> hadoop.proxyuser.hadoop.hosts
> *
>   
>   
> io.file.buffer.size
> 65536
>   
>   
> fs.AbstractFileSystem.s3.impl
> org.apache.hadoop.fs.s3.EMRFSDelegate
>   
>   
> fs.s3a.buffer.dir
> /tmp
>   
>   
> fs.s3bfs.impl
> org.apache.hadoop.fs.s3.S3FileSystem
>   
> 
> {code}
> This bug is about excessive logging.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem

2017-03-06 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897209#comment-15897209
 ] 

Steve Loughran commented on FLINK-5706:
---

If you look at where object stores are most trouble in the Hadoop code, it's 
making them pretend to be a filesystem, with things like rename & recursive 
delete, things which we developers expect to have specific behaviours and 
failure modes.

For object stores, I'd consider moving away from an FS abstraction, and having 
something just for them: no directories, just patterns, a limited set of verbs, 
explicit notions of http headers, etc. Trouble there is that a lowest common 
denominator becomes a limiting factor multipart puts are the key to committers 
in S3, leases that for Azure, etc. Which is why HADOOP-9965 stalled. That and 
the lack of enthusiasm for a "new" storage API from projects downstream like 
Hive. Maybe it's time to revisit that.

> Implement Flink's own S3 filesystem
> ---
>
> Key: FLINK-5706
> URL: https://issues.apache.org/jira/browse/FLINK-5706
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem

2017-03-04 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895808#comment-15895808
 ] 

Steve Loughran commented on FLINK-5706:
---

I should add that my current stance with using S3 as a direct destination of 
work is "don't". It will work at small scale, usually, but as well as the 
commit delays, any kind of failure causes problems. "Direct" committers like 
the one added to Spark are dangerous for that reason, and, due to listing 
inconsistency, you can't safely chain along work from one query to another.

> Implement Flink's own S3 filesystem
> ---
>
> Key: FLINK-5706
> URL: https://issues.apache.org/jira/browse/FLINK-5706
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5706) Implement Flink's own S3 filesystem

2017-03-04 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895648#comment-15895648
 ] 

Steve Loughran edited comment on FLINK-5706 at 3/4/17 6:24 PM:
---

Stefan, I don't think you appreciate how hard it is to do this.

I will draw your attention to all the features coming in Hadoop 2.8, 
HADOOP-11694, including seek-optimised input streams, disk/heap/byte block 
buffered uploads, support for encryption, optimisation of all requests HTTPS 
call by HTTPS call.

Then there's the todo list for later HADOOP-13204. One aspect of this, 
HADOOP-13345, s3guard uses dynamo DB for that consistent view of metadata, and 
in HADOOP-13786, something to direct commits to s3 which supports speculation 
and fault tolerance. 

These are all the things you get to replicate, along with the scale tests, 
which do find things, as HADOOP-14028 showed up on 70GB writes, the various 
intermittent failures you don't see often but cause serious problems when they 
do: example, the final POST of a multipart PUT doesn't do retries, you have to 
yourself. After you find the problems.

As a sibling project, you are free to lift the entirety of the s3a code, along 
with all the tests it includes. But you then take on the burden of maintaining 
it, fielding support calls, doing your entire integration test work yourself, 
performance turning. Did [I mention 
testing?|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md].
 We have put a lot of effort in there. You have to balance remote test runs 
with scalable performance and affordability, where reusing amazon's own 
datasets is the secret, our own TCP-DS datasets, and running tests in-EC2 to 
really stress things.

This is not trivial, even if you lift the latest branch-2 code. We are still 
finding bugs there, ones that surface in the field. We may have a broader set 
of downstream things to deal with: distcp, mapreduce, hive., spark, even flink, 
but that helps us with the test reports (We keep an eye on JIRAs and stack 
overflow for the word "s3a"), and the different deployment scenarios.

Please, do not take this on lightly.

Returning to your example above, 
# it's not just that the {{exists()/HEAD}} probe can take time to respond, it 
is that the directory listing lags the direct {{HEAD object}} call; even if the 
exists() check returns 404, a LIST operation may still list the entry. And 
because the front end load balancers cache things, the code deleting the object 
may get a 404 indicating that the object is gone, *there is no guarantee that a 
different caller will not get a 200*. 
# You may even get it in the same process, though if your calls are using a 
thread pool of keep-alive HTTP1.1 calls and all calls are on the same TCP 
connection, you'll be hitting the same load balancer and so get the cached 404. 
Because yes, load balancers cache 404 entries, meaning you don't even get 
create consistency if you do a check first.
# S3 doesn't have RAW consistency. It now has create consistency across all 
regions (yes, for a long time it had different behaviours on US-East vs the 
others) provided you don't do a HEAD first.
# You don't get PUT-over-PUT  consistency, DELETE consistency, and metadata 
queries invariably lag the object state, even on create.
# there is no such thing as `rename()`, merely a COPY of approx 6-10MB/s, so 
being O(data) and non atomic.
# if you are copying atop objects with the same name, you hit update 
consistency, for which there are no guarantees. Again, different callers may 
see different results, irrespective of call ordering, and listing will lag 
creation.

What you have seen so far is "demo scale" behaviours over a reused HTTP/1.1 
connection against the same load balancer. You cannot extrapolate from what 
works there with what offers guaranteed outcomes on large-scale operations with 
production data across multiple clusters, except for the special case "if it 
doesn't work here it won't magically work in production"




was (Author: ste...@apache.org):
Stefan, I don't think you appreciate how hard it is to do this.

I will draw your attention to all the features coming in Hadoop 2.8, 
HADOOP-11694, including seek-optimised input streams, disk/heap/byte block 
buffered uploads, support for encryption, optimisation of all requests HTTPS 
call by HTTPS call.

Then there's the todo list for later HADOOP-13204. One aspect of this, 
HADOOP-13345, s3guard uses dynamo DB for that consistent view of metadata, and 
in HADOOP-13786, something to direct commits to s3 which supports speculation 
and fault tolerance. 

These are all the things you get to replicate, along with the scale tests, 
which do find things, as HADOOP-14028 showed up on 70GB writes, the various 
intermittent failures you don't see often but cause serious problems when they 
do: example, the 

[jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem

2017-03-04 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895648#comment-15895648
 ] 

Steve Loughran commented on FLINK-5706:
---

Stefan, I don't think you appreciate how hard it is to do this.

I will draw your attention to all the features coming in Hadoop 2.8, 
HADOOP-11694, including seek-optimised input streams, disk/heap/byte block 
buffered uploads, support for encryption, optimisation of all requests HTTPS 
call by HTTPS call.

Then there's the todo list for later HADOOP-13204. One aspect of this, 
HADOOP-13345, s3guard uses dynamo DB for that consistent view of metadata, and 
in HADOOP-13786, something to direct commits to s3 which supports speculation 
and fault tolerance. 

These are all the things you get to replicate, along with the scale tests, 
which do find things, as HADOOP-14028 showed up on 70GB writes, the various 
intermittent failures you don't see often but cause serious problems when they 
do: example, the final POST of a multipart PUT doesn't do retries, you have to 
yourself. After you find the problems.

As a sibling project, you are free to lift the entirety of the s3a code, along 
with all the tests it includes. But you then take on the burden of maintaining 
it, fielding support calls, doing your entire integration test work yourself, 
performance turning. Did [I mention 
testing?|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md].
 We have put a lot of effort in there. You have to balance remote test runs 
with scalable performance and affordability, where reusing amazon's own 
datasets is the secret, our own TCP-DS datasets, and running tests in-EC2 to 
really stress things.

This is not trivial, even if you lift the latest branch-2 code. We are still 
finding bugs there, ones that surface in the field. We may have a broader set 
of downstream things to deal with: distcp, mapreduce, hive., spark, even flink, 
but that helps us with the test reports (We keep an eye on JIRAs and stack 
overflow for the word "s3a"), and the different deployment scenarios.

Please, do not take this on lightly.

Returning to your example above, 
# it's not just that the {{exists()/HEAD}} probe can take time to respond, it 
is that the directory listing lags the direct {{HEAD object}} call; even if the 
exists() check returns 404, a LIST operation may still list the entry. And 
because the front end load balancers cache things, the code deleting the object 
may get a 404 indicating that the object is gone, *there is no guarantee that a 
different caller will not get a 200*. 
# You may even get it in the same process, though if your calls are using a 
thread pool of keep-alive HTTP1.1 calls and all calls are on the same TCP 
connection, you'll be hitting the same load balancer and so get the cached 404. 
Because yes, load balancers cache 404 entries, meaning you don't even get 
create consistency if you do a check first.
# S3 doesn't have RAW consistency. It now has create consistency across all 
regions (yes, for a long time it had different behaviours on US-East vs the 
others) provided you don't do a HEAD first.
# You don't get PUT-over-PUT  consistency, DELETE consistency, and metadata 
queries invariably lag the object state, even on create.
# there is no such thing as `rename()`, merely a COPY of approx 6-10MB/s, so 
being O(data) and non atomic.
# if you are copying atop objects with the same name, you hit update 
consistency, for which there are no guarantees. Again, different callers may 
see different results, irrespective of call ordering, and listing will lag 
creation.

What you have seen so far is "demo scale" behaviours over a reused HTTP1/1 
thread against the same load balancer. You cannot extrapolate from what works 
there with what offers guaranteed outcomes on large-scale operations with 
production data across multiple clusters, except for the special case "if it 
doesn't work here it won't magically work in production"



> Implement Flink's own S3 filesystem
> ---
>
> Key: FLINK-5706
> URL: https://issues.apache.org/jira/browse/FLINK-5706
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)