Hi Steve, I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
I am now going through the documentation ( https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md) and it makes much much more sense now. Regards, Gourav Sengupta On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <[email protected]> wrote: > > On 2 Aug 2017, at 20:05, Gourav Sengupta <[email protected]> > wrote: > > Hi Steve, > > I have written a sincere note of apology to everyone in a separate email. > I sincerely request your kind forgiveness before hand if anything does > sound impolite in my emails, in advance. > > Let me first start by thanking you. > > I know it looks like I formed all my opinion based on that document, but > that is not the case at all. If you or anyone tries to execute the code > that I have given then they will see what I mean. Code speaks louder and > better than words for me. > > So I am not saying you are wrong. I am asking verify and expecting someone > will be able to correct a set of understanding that a moron like me has > gained after long hours of not having anything better to do. > > > SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with > replication 2 and there is a HADOOP cluster of three nodes. All these nodes > have SPARK workers (executors) running in them. Both are stored in the > following way: > ----------------------------------------------------- > | SYSTEM 1 | SYSTEM 2 | SYSTEM 3 | > | (worker1) | (worker2) | (worker3) | > | (master) | | | > ----------------------------------------------------- > | file1.csv | | file1.csv | > ----------------------------------------------------- > | | file2.csv | file2.csv | > ----------------------------------------------------- > | file3.csv | file3.csv | | > ----------------------------------------------------- > > > > > > CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN: > HDFS replication does not store the same file in all the nodes in the > cluster. So if I have three nodes and the replication is two then the same > file will be stored physically in two nodes in the cluster. Does that sound > right? > > > HDFS breaks files up into blocks (default = 128MB). If a .csv file is > > 128 then it will be broken up into blocks > > file1.cvs -> [block0001, block002, block0003] > > and each block will be replicated. With replication = 2 there will be two > copies of each block, but the file itself can span > 2 hosts. > > > ASSUMPTION (STEVE PLEASE CLARIFY THIS): > If SPARK is trying to process to the records then I am expecting that > WORKER2 should not be processing file1.csv, and similary WORKER 1 should > not be processing file2.csv and WORKER3 should not be processing file3.csv. > Because in case WORKER2 was trying to process file1.csv then it will > actually causing network transmission of the file unnecessarily. > > > Spark prefers to schedule work locally, so as to save on network traffic, > but it schedules for execution time over waiting for workers free on the > node with the data. IF a block is on nodes 2 and 3 but there is only a free > thread on node 1, then node 1 gets the work > > There's details on whether/how work across blocks takes place which I'm > avoiding. For now know those formats which are "splittable" will have work > scheduled by block. If you use Parquet/ORC/avro for your data and compress > with snappy, it will be split. This gives you maximum performance as >1 > thread can work on different blocks. That is, if file1 is split into three > blocks, three worker threads can process it. > > > ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY > THIS): > if WORKER 2 is not processing file1.csv then how does it matter whether > the file is there or not at all in the system? Should not SPARK just ask > the workers to process the files which are avialable in the worker nodes? > In case both WORKER2 and WORKER3 fails and are not available then file2.csv > will not be processed at all. > > > locality is best-effort, not guaranteed. > > > ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE > EXECUTED (Its been pointed out that I am learning SPARK, and even I did not > take more than 13 mins to set up the cluster and run the code). > > Once you execute the code then you will find that: > 1. if the path starts with file:/// while reading back then there is no > error reported, but the number of records reported back are only those > records in the worker which also has the server. > 2. also you will notice that once you cache the file before writing the > partitions are ditributed nicely across the workers, and while writing > back, the dataframe partitions does write properly to the worker node in > the Master, but the workers in the other system have the files written in > _temporary folder which does not get copied back to the main folder. > Inspite of this the job is not reported as failed in SPARK. > > > This gets into the "commit protocol". You don't want to know all the dirty > details (*) but essentially its this > > 1. Every worker writes its output to a directory under the destination > directory, something like '$dest/_temporary/$appAttemptId/_temporary/$ > taskAttemptID' > 2. it is the spark driver which "commits" the job by moving the output > from the individual workers from the temporary directories into $dest, then > deleting $dest/_temporary > 3. For which it needs to be able to list all the output in $dest/_temporary > > In your case, only the output on the same node of the driver is being > committed, because only those files can be listed and moved. The output on > the other nodes isn't seen, so isn't committed, nor cleaned up. > > > > Now in my own world, if I see, the following things are happening, > something is going wrong (with me): > 1. SPARK transfers files from different systems to process, instead of > processing them locally (I do not have code to prove this, and therefore > its just an assumption) > 2. SPARK cannot determine when the writes are failing in standalone > clusters workers and reports success (code is there for this) > 3. SPARK reports back number of records in the worker running in the > master node when count() is given without reporting an error while using > file:/// and reports an error when I mention the path without file:/// > (for SPARK 2.1.x onwards, code is there for this) > > > > s everyone's been saying, file:// requires a shared filestore, with > uniform paths everywhere. That's needed to list the files to process, read > the files in the workers and commit the final output. NFS cross-mounting is > the simplest way to do this, especially as for three nodes HDFS is > overkill: more services to keep running, no real fault tolerance. Export a > directory tree from one of the servers, give the rest access to it, don't > worry about bandwidth use as the shared disk itself will become the > bottleneck > > > > I very sincerely hope with your genuine help the bar of language and > social skills will be lowered for me. And everyone will find a way to > excuse me and not qualify this email as a means to measure my extremely > versatile and amazingly vivid social skills. It will be a lot of help to > just focus on the facts related to machines, data, error and (the language > that I somehow understand better) code. > > > My sincere apologies once again, as I am 100% sure that I did not meet the > required social and language skills. > > Thanks a ton once again for your kindness, patience and understanding. > > > Regards, > Gourav Sengupta > > > * for the curious, the details of the v1 and v2 commit protocols are > https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786- > committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/ > hadoop-aws/s3a_committer_architecture.md > > Like I said: you don't want to know the details, and you really don't want > to step through Hadoop's FileOutputCommitter to see what's going on. The > Spark side is much easier to follow. > >
