I have a fix and test for a recursive HDFSCopyToLocal. I also added similar 
code to Yarn application staging. However, even though all files and resources 
now copy correctly, S3A still fails on Flink session creation. The failure 
stems from the lib folder being registered as an application resource (as 
opposed to its contained contents). Since there is no such thing as a directory 
in S3, there is no file creation timestamp, and the local folder resource fails 
with the following error:

java.io.IOException: Resource 
s3a://mm-dev-flink-savepoints/user/ec2-user/.flink/application_1469150519301_0014/lib
 changed on src filesystem (expected 0, was 1469155595413
        at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
        at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
        at java.security.AccessController.doPrivileged(Native Method)..

I was stumped by why S3NativeFilesystem does _not_ fail, but reading this 
perhaps answers why: 
http://stackoverflow.com/questions/15619712/newly-created-s3-directory-has-1969-12-31-as-timestamp

Everything works for all FileSystem implementations if I change the staging 
code to expand directory resources to file resources, effectively flattening 
all resources into the base directory. The only issue with this approach would 
be if there are like-named classpath resources in nested directories, but 
apparently the current implementation only copies jars so perhaps that’s a 
non-issue.

Short of altering S3A to perform the linked “hack”, I don’t see how 
Flink/Yarn/S3a can work as currently implemented. I can add the resource 
directory flattening to my impending PR but I just want to be sure to first 
mention the risk (like-named nested resources). 

BTW, If anyone is wondering why I’m interested in S3a over S3n, it’s for this: 
https://issues.apache.org/jira     /browse/HADOOP-11183. In-memory multipart 
writes would be a great way to use the rolling file appender.                   
                

-Cliff




On 7/19/16, 4:00 AM, "Ufuk Celebi" <u...@apache.org> wrote:

    Feel free to do the contribution at any time you like. We can also
    always make it part of a bugfix release if it does not make it into
    the upcoming 1.1 RC (probably end of this week or beginning of next).
    Feel free to ping me if you need any feed back or pointers.
    
    – Ufuk
    
    
    On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
    <cresn...@mediamath.com> wrote:
    > In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local 
to where the yarn app is launched) to Yarn with a single directory copy. In 
1.0.3 it looked like it was copying the individual jars.
    >
    > So, yes I did actually change HDFSCopyToLocal, which was easy, but the 
job staging in the above class also needs altering. I’m happy to contribute on 
both though I won’t be able to get to it until later this week.
    >
    > -Cliff
    >
    >
    >
    > On 7/18/16, 3:38 PM, "Ufuk Celebi" <u...@apache.org> wrote:
    >
    >     Hey Cliff! Good to see that we came to the same conclusion :-) What do
    >     you mean with copying of the "lib" folder? This issue should be the
    >     same for both 1.0 and 1.1. Another work around could be to use the
    >     fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
    >
    >     If you like, you could also work on the issue I've created by
    >     implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
    >     contribute this via a pull request.
    >
    >     – Ufuk
    >
    >
    >     On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
    >     <cresn...@mediamath.com> wrote:
    >     > Hi Ufuk,
    >     >
    >     > My mail was down, so I missed this response. Thanks for that.
    >     >
    >     > On 7/18/16, 10:38 AM, "Ufuk Celebi" <u...@apache.org> wrote:
    >     >
    >     >     Hey Cliff!
    >     >
    >     >     I was able to reproduce this by locally running a job and 
RocksDB semi
    >     >     asynchronous checkpoints (current default) to S3A. I've created 
an
    >     >     issue here: https://issues.apache.org/jira/browse/FLINK-4228.
    >     >
    >     >     Running with S3N it is working as expected. You can use that
    >     >     implementation as a work around. I don't know whether it's 
possible to
    >     >     disable creation of MD5 hashes for S3A.
    >     >
    >     >     – Ufuk
    >     >
    >     >     On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
    >     >     <cresn...@mediamath.com> wrote:
    >     >     > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
    >     >     >
    >     >     >
    >     >     >
    >     >     > The error I’m getting is :
    >     >     >
    >     >     >
    >     >     >
    >     >     > 11:05:44,425 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask
    >     >     > - Caught exception while materializing asynchronous 
checkpoints.
    >     >     >
    >     >     > com.amazonaws.AmazonClientException: Unable to calculate MD5 
hash:
    >     >     > 
/var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
    >     >     > (Is a directory)
    >     >     >
    >     >     >                 at
    >     >     > 
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
    >     >     >
    >     >     >                 at
    >     >     > 
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
    >     >     >
    >     >     >                 at
    >     >     > 
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
    >     >     >
    >     >     >                 at
    >     >     > 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
    >     >     >
    >     >     >                 at
    >     >     > 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
    >     >     >
    >     >     >                 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >     >     >
    >     >     >                 at
    >     >     > 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    >     >     >
    >     >     >                 at
    >     >     > 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    >     >     >
    >     >     >                 at java.lang.Thread.run(Thread.java:745)
    >     >     >
    >     >     >
    >     >     >
    >     >     > In the debugger I noticed that some of the uploaded 
checkpoints are from the
    >     >     > configured /tmp location. These succeed as file in the 
request is fully
    >     >     > qualified, but I guess it’s different for WindowOperators? 
Here the file in
    >     >     > the request (using a different /var/folders.. location not 
configured by me
    >     >     > – must be a mac thing?) is actually a directory. The AWS api 
is failing when
    >     >     > it tries to calculate an MD5 of the directory. The Flink side 
of the
    >     >     > codepath is hard to discern from debugging because it’s 
asynchronous.
    >     >     >
    >     >     >
    >     >     >
    >     >     > I get the same issue whether local or on a CentOs- based YARN 
cluster.
    >     >     > Everything works if I use HDFS instead. Any insight will be 
greatly
    >     >     > appreciated! When I get a chance later I may try S3n or 
perhaps S3a with MD5
    >     >     > verification skipped.
    >     >     >
    >     >     >
    >     >     >
    >     >     > -Cliff
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >
    

Reply via email to