Thanks!

I will take a look

Andy

From: Gourav Sengupta <gourav.sengu...@gmail.com>
Date: Tuesday, January 11, 2022 at 8:42 AM
To: Andrew Davidson <aedav...@ucsc.edu>
Cc: Andrew Davidson <aedav...@ucsc.edu.invalid>, "user @spark" 
<user@spark.apache.org>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,
I do not think we need to do any of that. Please try repartitionbyrange, dpark 
3 has adaptive query execution with configurations to handle skew as well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson 
<aedav...@ucsc.edu<mailto:aedav...@ucsc.edu>> wrote:
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into 
small batches of rows. And then join the batch together then use union. My 
assumption is the union is a narrow transform and as such require fewer 
resources. Let say I have 5 data frames I want to join together and each has 
300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns 
out I need to call orderBy on every iteration of the for loop. I tried sorting 
outside of the while loop, did not resolve problem Given the size of my 
dataframes that is going crush performance. My unit test works. I never ran it 
on my real data set.

    # Create a copy of original dataframe
        copyDF = df.orderBy("Name")
        # copyDF.show()

        i = 0
        while i < numberOfSplits:
            self.logger.warn("i:{}".format(i))
            # Get the top `numRows` number of rows
            # note take() is an action
            # limit() is a transformation
            topDF = copyDF.limit( numRows )

            # Truncate the `copy_df` to remove
            # the contents fetched for `temp_df`
            # original quant.sf files are sorted by name however
            # we must use order by, else the row names between
            # GTEx sample will not be the same
            # we can not simply sort or orderBy once. we have to
            # do this on every iteration
            copyDF = copyDF.subtract(topDF).orderBy( "Name" )

            retList[i] = topDF

            # Increment the split number
            i += 1

        if remainingRows > 0 :
            self.logger.info<http://self.logger.info>("AEDWIP writing last i:{} 
len(retList):{}".format(i, len(retList)))
            retList[i] = copyDF
            #copyDF.show()
            #retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add 
a row number I could easily split up mydata frames. My code would look a lot 
like what I would write in pandas or R

while i < numBatches:
    start = i * numRows
    end = start + numRows
    print("\ni:{} start:{} end:{}".format(i, start,end))
    df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but 
not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>>
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson <aedav...@ucsc.edu.invalid>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating 
the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <aedav...@ucsc.edu.invalid> 
wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
            w = Window().orderBy(lit('A'))
            sampleDF = sampleDF.select( ["NumReads"] )\
                        .withColumnRenamed( "NumReads", sampleName )\
                        .withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The 
order is important

Kind regards

Andy

Reply via email to