I think my problem has to do with mega-mem machine. It was hard to get quota 
for mega-mem machines.  I wonder if they are unstable? Any suggestions for how 
I look at the ‘hardware’?

I ran the same job several times. They all failed in different ways. Once 
looked like sort of networking problem accessing gcp buckets

Several times it looked like my jobs fail when I call df.checkpoint() basically 
no progress in my driver log files after 30 mins.  Cpu utilization crashes from 
60 % to  almost zero. I terminated the jobs.

One time the checkpoint seemed to hang after doing a series of narrow 
transformations on a single data frame

Most of the time the checkpoint seem to fail while calculate rowSums, I have 
reworked the rowSum code several times. See bellow for final versoin

Based on google searches it seem like in gcp dataproc, people set the 
checkpointdir to be something like gs://myBucket/checkpoint/

I see the cluster has a lot of HDFSstorage. As my job runs memory utilization 
== 100%. My cluster has 2.8 Tb of memory. Spark will eventually start  writing 
something to HDFS. As a newbie I would think we would want to set the 
checkpointdir to HDFS. I do not think HDFS is the limiting resource. It never 
seems to be fully exhausted. I did a lot of googling and was unable find an 
HDFS example URL. The checkpoints() are really slow. Takes twice as long as 
when I call cache().

Comments and suggestions appreciated

Andy

###############################################################################
    def rowSums( self, countsSparkDF, columnNames, columnBatchSize ):
        '''
        The GTEx training data set has 10409 numeric columns. This cause a
        java.lang.StackOverflowError because the DAG is to big. increasing 
spark driver
        memory does not help. The work around is sum  smaller batches of columns
        and cache the results of each batch
        '''
        self.logger.warn("rowSums BEGIN")
        totalColName = "rowSum"
        for i in range(0, len(columnNames), columnBatchSize) :
            tmpColName = "tmpSum" + str(i)
            batch = columnNames[i:i+columnBatchSize]
            countsSparkDF = self.rowSumsImpl(countsSparkDF, tmpColName, batch)

            if i == 0:
                countsSparkDF = countsSparkDF.withColumnRenamed(tmpColName, 
totalColName)

            else:
                # calculate rolling total
                countsSparkDF = countsSparkDF.withColumn(totalColName, 
col(totalColName) + col(tmpColName))
                # save space
                countsSparkDF = countsSparkDF.drop(tmpColName )

            # use an action to force execution
            numRows = countsSparkDF.count()
            self.logger.warn("rowSums:batch:{} numRows:{}".format(i, numRows))

            # check point will save the df data but not its linage
            #countsSparkDF.cache()
            countsSparkDF.checkpoint()

        self.logger.warn("rowSums END")
        return countsSparkDF

    
###############################################################################
    def rowSumsImpl( self, countsSparkDF, newColName, columnNames ):
        '''
        calculates actual sum of columns

        arguments
            countSparkDF

            newColumName:
                results from column sum will be sorted here

            columnNames:
                list of columns to sum

        returns
            amended countSparkDF
        '''
        self.logger.warn( "rowSumsImpl BEGIN" )

        # https://stackoverflow.com/a/54283997/4586180
        retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( 
add, [col( x ) for x in columnNames] ) )

        self.logger.warn( "rowSumsImpl END\n" )
        return retDF



From: Mich Talebzadeh <mich.talebza...@gmail.com>
Date: Monday, January 24, 2022 at 12:54 AM
To: Andrew Davidson <aedav...@ucsc.edu.invalid>
Cc: "user @spark" <user@spark.apache.org>
Subject: Re: What are your experiences using google cloud platform

Dataproc works fine. The current version is Spark 3.1.2. Look at your code,  
hardware and scaling.



HTH


 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Sun, 23 Jan 2022 at 21:19, Andrew Davidson <aedav...@ucsc.edu.invalid> wrote:
Hi recently started using GCP dataproc spark.

Seem to have trouble getting big jobs to complete. I am using check points. I 
am wondering if maybe I should look for another cloud solution

Kind regards

Andy

Reply via email to