Hi Marco,

For the first time in several years FOR THE VERY FIRST TIME. I am seeing
someone actually executing code and providing response. It feel wonderful
that at least someone considered to respond back by executing code and just
did not filter out each and every technical details to brood only on my
superb social skills, while claiming the reason for ignoring technical
details is that it elementary. I think that Steve also is the first person
who could answer the WHY of an elementary question instead of saying that
is how it is and pointed out to the correct documentation.

That code works fantastically. But the problem which I have tried to find
out is while writing out the data and not reading it.


So if you see try to read the data from the same folder which has the same
file across all the nodes then it will work fine. In fact that is what
should work.

What does not work is that if you try to write back the file and then read
it once again from the location you have written that is when the issue
starts happening.

Therefore if in my code you were to save the pandas dataframe as a CSV file
and then read it then you will find the following observations:

FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
columns=list('ABCD'))
pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
header=True, sep=",", index=0)
testdf = spark.read.load("/Users/gouravsengupta/Development/
spark/sparkdata/testdir/")
testdf.cache()
testdf.count()
------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------


FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
THE DATA DOES NOT EXISTS
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
columns=list('ABCD'))
pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
header=True, sep=",", index=0)
testdf = spark.read.load("file:///Users/gouravsengupta/Development/
spark/sparkdata/testdir/")
testdf.cache()
testdf.count()
------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------


if you execute my code then also you will surprisingly see that the writes
in the nodes which is not the master node does not complete moving the
files from the _temporary folder to the main one.


Regards,
Gourav Sengupta



On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Hello
>  please have a look at this. it'sa simple script that just read a
> dataframe for n time, sleeping at random interval. i used it to test memory
> issues that another user was experiencing on a spark cluster
>
> you should run it like this e.g
> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> <num
> of iterations>
>
> i ran it on the cluster like this
>
> ./spark-submit --master spark://ec2-54-218-113-119.us-
> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
> file:///root/pyscripts/tree_addhealth.csv
>
> hth, ping me back if you have issues
> i do agree with Steve's comments.... if you want to test your  spark
> script s just for playing, do it on  a standaone server on your localhost.
> Moving to a c luster is just a matter of deploying your script and mke sure
> you have a common place where to read and store the data..... SysAdmin
> should give you this when they setup the cluster...
>
> kr
>
>
>
>
> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <gourav.sengu...@gmail.com
> > wrote:
>
>> Hi Marco,
>>
>> I am sincerely obliged for your kind time and response. Can you please
>> try the solution that you have so kindly suggested?
>>
>> It will be a lot of help if you could kindly execute the code that I have
>> given. I dont think that anyone has yet.
>>
>> There are lots of fine responses to my question here, but if you read the
>> last response from Simon, it comes the closest to being satisfactory. I am
>> sure even he did not execute the code, but at least he came quite close to
>> understanding what the problem is.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistr...@gmail.com>
>> wrote:
>>
>>> Hello
>>>  my 2 cents here, hope it helps
>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>> it's an unnecessary dependency that you dont need for just running a python
>>> script
>>> Instead do the following:
>>> - got to the root of our master / slave node. create a directory
>>> /root/pyscripts
>>> - place your csv file there as well as the python script
>>> - run the script to replicate the whole directory  across the cluster (i
>>> believe it's called copy-script.sh)
>>> - then run your spark-submit , it will be something lke
>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>> - in your python script, as part of your processing, write the parquet
>>> file in directory /root/pyscripts
>>>
>>> If you have an AWS account and you are versatile with that - you need to
>>> setup bucket permissions etc - , you can just
>>> - store your file in one of your S3 bucket
>>> - create an EMR cluster
>>> - connect to master or slave
>>> - run your  scritp that reads from the s3 bucket and write to the same
>>> s3 bucket
>>>
>>>
>>> Feel free to mail me privately, i have a working script i have used to
>>> test some code on spark standalone cluster
>>> hth
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi Steve,
>>>>
>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>
>>>> I am now going through the documentation (https://github.com/stevelough
>>>> ran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/
>>>> hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_
>>>> architecture.md) and it makes much much more sense now.
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <ste...@hortonworks.com
>>>> > wrote:
>>>>
>>>>>
>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Steve,
>>>>>
>>>>> I have written a sincere note of apology to everyone in a separate
>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>> does sound impolite in my emails, in advance.
>>>>>
>>>>> Let me first start by thanking you.
>>>>>
>>>>> I know it looks like I formed all my opinion based on that document,
>>>>> but that is not the case at all. If you or anyone tries to execute the 
>>>>> code
>>>>> that I have given then they will see what I mean. Code speaks louder and
>>>>> better than words for me.
>>>>>
>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>> someone will be able to correct  a set of understanding that a moron like
>>>>> me has gained after long hours of not having anything better to do.
>>>>>
>>>>>
>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>>>> with replication 2 and there is a HADOOP cluster of three nodes. All these
>>>>> nodes have SPARK workers (executors) running in them.  Both are stored in
>>>>> the following way:
>>>>> -----------------------------------------------------
>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>> | (master)     |                     |                    |
>>>>> -----------------------------------------------------
>>>>> | file1.csv      |                     | file1.csv     |
>>>>> -----------------------------------------------------
>>>>> |                    |  file2.csv      | file2.csv     |
>>>>> -----------------------------------------------------
>>>>> | file3.csv      |  file3.csv      |                   |
>>>>> -----------------------------------------------------
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>> HDFS replication does not store the same file in all the nodes in the
>>>>> cluster. So if I have three nodes and the replication is two then the same
>>>>> file will be stored physically in two nodes in the cluster. Does that 
>>>>> sound
>>>>> right?
>>>>>
>>>>>
>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is
>>>>> > 128 then it will be broken up into blocks
>>>>>
>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>
>>>>> and each block will be replicated. With replication = 2 there will be
>>>>> two copies of each block, but the file itself can span > 2 hosts.
>>>>>
>>>>>
>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>> If SPARK is trying to process to the records then I am expecting that
>>>>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>>>>> not be processing file2.csv and WORKER3 should not be processing 
>>>>> file3.csv.
>>>>> Because in case WORKER2 was trying to process file1.csv then it will
>>>>> actually causing network transmission of the file unnecessarily.
>>>>>
>>>>>
>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>> traffic, but it schedules for execution time over waiting for workers free
>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is 
>>>>> only
>>>>> a free thread on node 1, then node 1 gets the work
>>>>>
>>>>> There's details on whether/how work across blocks takes place which
>>>>> I'm avoiding. For now know those formats which are "splittable" will have
>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>> compress with snappy, it will be split. This gives you maximum performance
>>>>> as >1 thread can work on different blocks. That is, if file1 is split into
>>>>> three blocks, three worker threads can process it.
>>>>>
>>>>>
>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
>>>>> THIS):
>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>> just ask the workers to process the files which are avialable in the 
>>>>> worker
>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>> file2.csv will not be processed at all.
>>>>>
>>>>>
>>>>> locality is best-effort, not guaranteed.
>>>>>
>>>>>
>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
>>>>> EXECUTED (Its been pointed out that I am learning SPARK, and even I did 
>>>>> not
>>>>> take more than 13 mins to set up the cluster and run the code).
>>>>>
>>>>> Once you execute the code then you will find that:
>>>>> 1.  if the path starts with file:/// while reading back then there is
>>>>> no error reported, but the number of records reported back are only those
>>>>> records in the worker which also has the server.
>>>>> 2. also you will notice that once you cache the file before writing
>>>>> the partitions are ditributed nicely across the workers, and while writing
>>>>> back, the dataframe partitions does write properly to the worker node in
>>>>> the Master, but the workers in the other system have the files written in
>>>>> _temporary folder which does not get copied back to the main folder.
>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>
>>>>>
>>>>> This gets into the "commit protocol". You don't want to know all the
>>>>> dirty details (*) but essentially its this
>>>>>
>>>>> 1. Every worker writes its output to a directory under the destination
>>>>> directory, something like '$dest/_temporary/$appAtt
>>>>> emptId/_temporary/$taskAttemptID'
>>>>> 2. it is the spark driver which "commits" the job by moving the output
>>>>> from the individual workers from the temporary directories into $dest, 
>>>>> then
>>>>> deleting $dest/_temporary
>>>>> 3. For which it needs to be able to list all the output in
>>>>> $dest/_temporary
>>>>>
>>>>> In your case, only the output on the same node of the driver is being
>>>>> committed, because only those files can be listed and moved. The output on
>>>>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>
>>>>>
>>>>>
>>>>> Now in my own world, if I see, the following things are happening,
>>>>> something is going wrong (with me):
>>>>> 1. SPARK transfers files from different systems to process, instead of
>>>>> processing them locally (I do not have code to prove this, and therefore
>>>>> its just an assumption)
>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>> clusters workers and reports success (code is there for this)
>>>>> 3. SPARK reports back number of records in the worker running in the
>>>>> master node when count() is given without reporting an error while using
>>>>> file:/// and reports an error when I mention the path without file:///
>>>>> (for SPARK 2.1.x onwards, code is there for this)
>>>>>
>>>>>
>>>>>
>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>> uniform paths everywhere. That's needed to list the files to process, read
>>>>> the files in the workers and commit the final output. NFS cross-mounting 
>>>>> is
>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>> overkill: more services to keep running, no real fault tolerance. Export a
>>>>> directory tree from one of the servers, give the rest access to it, don't
>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>> bottleneck
>>>>>
>>>>>
>>>>>
>>>>> I very sincerely hope with your genuine help the bar of language and
>>>>> social skills will be lowered for me. And everyone will find a way to
>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>> just focus on the facts related to machines, data, error and (the language
>>>>> that I somehow understand better) code.
>>>>>
>>>>>
>>>>> My sincere apologies once again, as I am 100% sure that I did not meet
>>>>> the required social and language skills.
>>>>>
>>>>> Thanks a ton once again for your kindness, patience and understanding.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>>
>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>
>>>>> Like I said: you don't want to know the details, and you really don't
>>>>> want to step through Hadoop's FileOutputCommitter to see what's going on.
>>>>> The Spark side is much easier to follow.
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to