Hi Marco, thanks a ton, I will surely use those alternatives.
Regards, Gourav Sengupta On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni <mmistr...@gmail.com> wrote: > Sengupta > further to this, if you try the following notebook in databricks cloud, > it will read a .csv file , write to a parquet file and read it again (just > to count the number of rows stored) > Please note that the path to the csv file might differ for you..... > So, what you will need todo is > 1 - create an account to community.cloud.databricks.com > 2 - upload the .csv file onto the Data of your databricks private cluster > 3 - run the script. that will store the data on the distrubuted > filesystem of the databricks cloudn (dbfs) > > It's worth investing in this free databricks cloud as it can create a > cluster for you with minimal effort, and it's a very easy way to test your > spark scripts on a real cluster > > hope this helps > kr > > ################################## > from pyspark.sql import SQLContext > > from random import randint > from time import sleep > from pyspark.sql.session import SparkSession > import logging > logger = logging.getLogger(__name__) > logger.setLevel(logging.INFO) > ch = logging.StreamHandler() > logger.addHandler(ch) > > > import sys > > def read_parquet_file(parquetFileName): > logger.info('Reading now the parquet files we just created...:%s', > parquetFileName) > parquet_data = sqlContext.read.parquet(parquetFileName) > logger.info('Parquet file has %s', parquet_data.count()) > > def dataprocessing(filePath, count, sqlContext): > logger.info( 'Iter count is:%s' , count) > if count == 0: > print 'exiting' > else: > df_traffic_tmp = sqlContext.read.format("csv"). > option("header",'true').load(filePath) > logger.info( '#############################DataSet has:%s' , > df_traffic_tmp.count()) > logger.info('WRting to a parquet file') > parquetFileName = "dbfs:/myParquetDf2.parquet" > df_traffic_tmp.write.parquet(parquetFileName) > sleepInterval = randint(10,100) > logger.info( '#############################Sleeping for %s' , > sleepInterval) > sleep(sleepInterval) > read_parquet_file(parquetFileName) > dataprocessing(filePath, count-1, sqlContext) > > filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This > path might differ for you > iterations = 1 > logger.info('----------------------') > logger.info('Filename:%s', filename) > logger.info('Iterations:%s', iterations ) > logger.info('----------------------') > > logger.info ('Initializing sqlContext') > logger.info( '........Starting spark..........Loading from%s for %s > iterations' , filename, iterations) > logger.info( 'Starting up....') > sc = SparkSession.builder.appName("Data Processsing").getOrCreate() > logger.info ('Initializing sqlContext') > sqlContext = SQLContext(sc) > dataprocessing(filename, iterations, sqlContext) > logger.info('Out of here..') > ###################################### > > > On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mmistr...@gmail.com> > wrote: > >> Uh believe me there are lots of ppl on this list who will send u code >> snippets if u ask... 😀 >> >> Yes that is what Steve pointed out, suggesting also that for that simple >> exercise you should perform all operations on a spark standalone instead >> (or alt. Use an nfs on the cluster) >> I'd agree with his suggestion.... >> I suggest u another alternative: >> https://community.cloud.databricks.com/ >> >> That's a ready made cluster and you can run your spark app as well store >> data on the cluster (well I haven't tried myself but I assume it's >> possible). Try that out... I will try ur script there as I have an >> account there (though I guess I'll get there before me.....) >> >> Try that out and let me know if u get stuck.... >> Kr >> >> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com> >> wrote: >> >>> Hi Marco, >>> >>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing >>> someone actually executing code and providing response. It feel wonderful >>> that at least someone considered to respond back by executing code and just >>> did not filter out each and every technical details to brood only on my >>> superb social skills, while claiming the reason for ignoring technical >>> details is that it elementary. I think that Steve also is the first person >>> who could answer the WHY of an elementary question instead of saying that >>> is how it is and pointed out to the correct documentation. >>> >>> That code works fantastically. But the problem which I have tried to >>> find out is while writing out the data and not reading it. >>> >>> >>> So if you see try to read the data from the same folder which has the >>> same file across all the nodes then it will work fine. In fact that is what >>> should work. >>> >>> What does not work is that if you try to write back the file and then >>> read it once again from the location you have written that is when the >>> issue starts happening. >>> >>> Therefore if in my code you were to save the pandas dataframe as a CSV >>> file and then read it then you will find the following observations: >>> >>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> --------------------------- >>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4), >>> columns=list('ABCD')) >>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv", >>> header=True, sep=",", index=0) >>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa >>> rkdata/testdir/") >>> testdf.cache() >>> testdf.count() >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> --------------------------- >>> >>> >>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN >>> WHICH THE DATA DOES NOT EXISTS >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> --------------------------- >>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4), >>> columns=list('ABCD')) >>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv", >>> header=True, sep=",", index=0) >>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/sp >>> ark/sparkdata/testdir/") >>> testdf.cache() >>> testdf.count() >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> ------------------------------------------------------------ >>> --------------------------- >>> >>> >>> if you execute my code then also you will surprisingly see that the >>> writes in the nodes which is not the master node does not complete moving >>> the files from the _temporary folder to the main one. >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> >>> >>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com> >>> wrote: >>> >>>> Hello >>>> please have a look at this. it'sa simple script that just read a >>>> dataframe for n time, sleeping at random interval. i used it to test memory >>>> issues that another user was experiencing on a spark cluster >>>> >>>> you should run it like this e.g >>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> >>>> <num of iterations> >>>> >>>> i ran it on the cluster like this >>>> >>>> ./spark-submit --master spark://ec2-54-218-113-119.us- >>>> west-2.compute.amazonaws.com:7077 >>>> /root/pyscripts/dataprocessing_Sample-2.py >>>> file:///root/pyscripts/tree_addhealth.csv >>>> >>>> hth, ping me back if you have issues >>>> i do agree with Steve's comments.... if you want to test your spark >>>> script s just for playing, do it on a standaone server on your localhost. >>>> Moving to a c luster is just a matter of deploying your script and mke sure >>>> you have a common place where to read and store the data..... SysAdmin >>>> should give you this when they setup the cluster... >>>> >>>> kr >>>> >>>> >>>> >>>> >>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta < >>>> gourav.sengu...@gmail.com> wrote: >>>> >>>>> Hi Marco, >>>>> >>>>> I am sincerely obliged for your kind time and response. Can you please >>>>> try the solution that you have so kindly suggested? >>>>> >>>>> It will be a lot of help if you could kindly execute the code that I >>>>> have given. I dont think that anyone has yet. >>>>> >>>>> There are lots of fine responses to my question here, but if you read >>>>> the last response from Simon, it comes the closest to being satisfactory. >>>>> I >>>>> am sure even he did not execute the code, but at least he came quite close >>>>> to understanding what the problem is. >>>>> >>>>> >>>>> Regards, >>>>> Gourav Sengupta >>>>> >>>>> >>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistr...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello >>>>>> my 2 cents here, hope it helps >>>>>> If you want to just to play around with Spark, i'd leave Hadoop out, >>>>>> it's an unnecessary dependency that you dont need for just running a >>>>>> python >>>>>> script >>>>>> Instead do the following: >>>>>> - got to the root of our master / slave node. create a directory >>>>>> /root/pyscripts >>>>>> - place your csv file there as well as the python script >>>>>> - run the script to replicate the whole directory across the cluster >>>>>> (i believe it's called copy-script.sh) >>>>>> - then run your spark-submit , it will be something lke >>>>>> ./spark-submit /root/pyscripts/mysparkscripts.py >>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master >>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077 >>>>>> - in your python script, as part of your processing, write the >>>>>> parquet file in directory /root/pyscripts >>>>>> >>>>>> If you have an AWS account and you are versatile with that - you need >>>>>> to setup bucket permissions etc - , you can just >>>>>> - store your file in one of your S3 bucket >>>>>> - create an EMR cluster >>>>>> - connect to master or slave >>>>>> - run your scritp that reads from the s3 bucket and write to the >>>>>> same s3 bucket >>>>>> >>>>>> >>>>>> Feel free to mail me privately, i have a working script i have used >>>>>> to test some code on spark standalone cluster >>>>>> hth >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta < >>>>>> gourav.sengu...@gmail.com> wrote: >>>>>> >>>>>>> Hi Steve, >>>>>>> >>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING. >>>>>>> >>>>>>> I am now going through the documentation ( >>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP >>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t >>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much >>>>>>> much more sense now. >>>>>>> >>>>>>> Regards, >>>>>>> Gourav Sengupta >>>>>>> >>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran < >>>>>>> ste...@hortonworks.com> wrote: >>>>>>> >>>>>>>> >>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengu...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Steve, >>>>>>>> >>>>>>>> I have written a sincere note of apology to everyone in a separate >>>>>>>> email. I sincerely request your kind forgiveness before hand if >>>>>>>> anything >>>>>>>> does sound impolite in my emails, in advance. >>>>>>>> >>>>>>>> Let me first start by thanking you. >>>>>>>> >>>>>>>> I know it looks like I formed all my opinion based on that >>>>>>>> document, but that is not the case at all. If you or anyone tries to >>>>>>>> execute the code that I have given then they will see what I mean. Code >>>>>>>> speaks louder and better than words for me. >>>>>>>> >>>>>>>> So I am not saying you are wrong. I am asking verify and expecting >>>>>>>> someone will be able to correct a set of understanding that a moron >>>>>>>> like >>>>>>>> me has gained after long hours of not having anything better to do. >>>>>>>> >>>>>>>> >>>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in >>>>>>>> HDFS with replication 2 and there is a HADOOP cluster of three nodes. >>>>>>>> All >>>>>>>> these nodes have SPARK workers (executors) running in them. Both are >>>>>>>> stored in the following way: >>>>>>>> ----------------------------------------------------- >>>>>>>> | SYSTEM 1 | SYSTEM 2 | SYSTEM 3 | >>>>>>>> | (worker1) | (worker2) | (worker3) | >>>>>>>> | (master) | | | >>>>>>>> ----------------------------------------------------- >>>>>>>> | file1.csv | | file1.csv | >>>>>>>> ----------------------------------------------------- >>>>>>>> | | file2.csv | file2.csv | >>>>>>>> ----------------------------------------------------- >>>>>>>> | file3.csv | file3.csv | | >>>>>>>> ----------------------------------------------------- >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN: >>>>>>>> HDFS replication does not store the same file in all the nodes in >>>>>>>> the cluster. So if I have three nodes and the replication is two then >>>>>>>> the >>>>>>>> same file will be stored physically in two nodes in the cluster. Does >>>>>>>> that >>>>>>>> sound right? >>>>>>>> >>>>>>>> >>>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file >>>>>>>> is > 128 then it will be broken up into blocks >>>>>>>> >>>>>>>> file1.cvs -> [block0001, block002, block0003] >>>>>>>> >>>>>>>> and each block will be replicated. With replication = 2 there will >>>>>>>> be two copies of each block, but the file itself can span > 2 hosts. >>>>>>>> >>>>>>>> >>>>>>>> ASSUMPTION (STEVE PLEASE CLARIFY THIS): >>>>>>>> If SPARK is trying to process to the records then I am expecting >>>>>>>> that WORKER2 should not be processing file1.csv, and similary WORKER 1 >>>>>>>> should not be processing file2.csv and WORKER3 should not be processing >>>>>>>> file3.csv. Because in case WORKER2 was trying to process file1.csv >>>>>>>> then it >>>>>>>> will actually causing network transmission of the file unnecessarily. >>>>>>>> >>>>>>>> >>>>>>>> Spark prefers to schedule work locally, so as to save on network >>>>>>>> traffic, but it schedules for execution time over waiting for workers >>>>>>>> free >>>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is >>>>>>>> only >>>>>>>> a free thread on node 1, then node 1 gets the work >>>>>>>> >>>>>>>> There's details on whether/how work across blocks takes place which >>>>>>>> I'm avoiding. For now know those formats which are "splittable" will >>>>>>>> have >>>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and >>>>>>>> compress with snappy, it will be split. This gives you maximum >>>>>>>> performance >>>>>>>> as >1 thread can work on different blocks. That is, if file1 is split >>>>>>>> into >>>>>>>> three blocks, three worker threads can process it. >>>>>>>> >>>>>>>> >>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE >>>>>>>> CLARIFY THIS): >>>>>>>> if WORKER 2 is not processing file1.csv then how does it matter >>>>>>>> whether the file is there or not at all in the system? Should not SPARK >>>>>>>> just ask the workers to process the files which are avialable in the >>>>>>>> worker >>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available >>>>>>>> then >>>>>>>> file2.csv will not be processed at all. >>>>>>>> >>>>>>>> >>>>>>>> locality is best-effort, not guaranteed. >>>>>>>> >>>>>>>> >>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD >>>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I >>>>>>>> did >>>>>>>> not take more than 13 mins to set up the cluster and run the code). >>>>>>>> >>>>>>>> Once you execute the code then you will find that: >>>>>>>> 1. if the path starts with file:/// while reading back then there >>>>>>>> is no error reported, but the number of records reported back are only >>>>>>>> those records in the worker which also has the server. >>>>>>>> 2. also you will notice that once you cache the file before writing >>>>>>>> the partitions are ditributed nicely across the workers, and while >>>>>>>> writing >>>>>>>> back, the dataframe partitions does write properly to the worker node >>>>>>>> in >>>>>>>> the Master, but the workers in the other system have the files written >>>>>>>> in >>>>>>>> _temporary folder which does not get copied back to the main folder. >>>>>>>> Inspite of this the job is not reported as failed in SPARK. >>>>>>>> >>>>>>>> >>>>>>>> This gets into the "commit protocol". You don't want to know all >>>>>>>> the dirty details (*) but essentially its this >>>>>>>> >>>>>>>> 1. Every worker writes its output to a directory under the >>>>>>>> destination directory, something like '$dest/_temporary/$appAtt >>>>>>>> emptId/_temporary/$taskAttemptID' >>>>>>>> 2. it is the spark driver which "commits" the job by moving the >>>>>>>> output from the individual workers from the temporary directories into >>>>>>>> $dest, then deleting $dest/_temporary >>>>>>>> 3. For which it needs to be able to list all the output in >>>>>>>> $dest/_temporary >>>>>>>> >>>>>>>> In your case, only the output on the same node of the driver is >>>>>>>> being committed, because only those files can be listed and moved. The >>>>>>>> output on the other nodes isn't seen, so isn't committed, nor cleaned >>>>>>>> up. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Now in my own world, if I see, the following things are happening, >>>>>>>> something is going wrong (with me): >>>>>>>> 1. SPARK transfers files from different systems to process, instead >>>>>>>> of processing them locally (I do not have code to prove this, and >>>>>>>> therefore >>>>>>>> its just an assumption) >>>>>>>> 2. SPARK cannot determine when the writes are failing in standalone >>>>>>>> clusters workers and reports success (code is there for this) >>>>>>>> 3. SPARK reports back number of records in the worker running in >>>>>>>> the master node when count() is given without reporting an error while >>>>>>>> using file:/// and reports an error when I mention the path >>>>>>>> without file:/// (for SPARK 2.1.x onwards, code is there for this) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> s everyone's been saying, file:// requires a shared filestore, with >>>>>>>> uniform paths everywhere. That's needed to list the files to process, >>>>>>>> read >>>>>>>> the files in the workers and commit the final output. NFS >>>>>>>> cross-mounting is >>>>>>>> the simplest way to do this, especially as for three nodes HDFS is >>>>>>>> overkill: more services to keep running, no real fault tolerance. >>>>>>>> Export a >>>>>>>> directory tree from one of the servers, give the rest access to it, >>>>>>>> don't >>>>>>>> worry about bandwidth use as the shared disk itself will become the >>>>>>>> bottleneck >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I very sincerely hope with your genuine help the bar of language >>>>>>>> and social skills will be lowered for me. And everyone will find a way >>>>>>>> to >>>>>>>> excuse me and not qualify this email as a means to measure my extremely >>>>>>>> versatile and amazingly vivid social skills. It will be a lot of help >>>>>>>> to >>>>>>>> just focus on the facts related to machines, data, error and (the >>>>>>>> language >>>>>>>> that I somehow understand better) code. >>>>>>>> >>>>>>>> >>>>>>>> My sincere apologies once again, as I am 100% sure that I did not >>>>>>>> meet the required social and language skills. >>>>>>>> >>>>>>>> Thanks a ton once again for your kindness, patience and >>>>>>>> understanding. >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Gourav Sengupta >>>>>>>> >>>>>>>> >>>>>>>> * for the curious, the details of the v1 and v2 commit protocols are >>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP- >>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to >>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md >>>>>>>> >>>>>>>> Like I said: you don't want to know the details, and you really >>>>>>>> don't want to step through Hadoop's FileOutputCommitter to see what's >>>>>>>> going >>>>>>>> on. The Spark side is much easier to follow. >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >