Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Davidson
Hi Sean

My “insert” solution is hack that might work give we can easily spin up a 
single VM with a crazy amouts of memory. I would prefer to see a distributed 
solution. It is just a matter of time before someone want to create an even 
bigger table using cbind.

I understand you probably already know a lot about traditional RDBS’s. Much of 
my post is back ground for others

I used to do some of classic relational database work before tools like Hadoop, 
spark and NoSQL became available .

The standard operations on a single table in a relation database are

Insert “row”. This is similar to spark union.  Typically primary keys in  in 
rbdms tables are indexed  to enable quick look up. So insert is probably not 1 
for. 1 with union. The row may not simply be appended to the end of the table.

Update a “row”
Delete a “row”
Select “rows where”

Rdms server enable row and table level locking. Data must always be in a 
consistent state. You must commit or abort you changes for them to persist and 
to release locks on the data. Locks are required because you have a single 
resource and may user requesting service simultaneously. This is very different 
from Spark

Storage and memory used to be really expensive so often people tried to create 
“1st normal form” schemas. I.E. no duplicate data to reduce hardware cost.  1st 
normal design require you to use joins to the get data table you want. Joins 
are expensive. Often design duplicated some data to improve performance by 
minimize the number of joins required. Duplicate data make maintaining 
consistency harder. There are other advantages to normalized data design and as 
we are all aware in the bigdata world lots of disadvantages. The dbms ran on a 
single big machine. Join was not implemented as distributed map/reduce.

So My idea is use a traditional RDMS server: my final table will have 5 million 
rows and 10,114 columns.

  1.  Read the column vector from each of 10,114 data files
  2.  insert the column vector as a row in the table
 *   I read a file that has a single element on each line. All I need to do 
is replace \n with ,
  3.  Now I have table with 10,115 rows and 5 million columns
  4.  The row id (primary key) is the original file name
  5.  The columns are the row ids in the original column vectors
  6.  Now all I need to do is pivot this single table to get what I want. This 
is the only join or map/reduce like operation
  7.  A table with 5million rows and 10,114 columns


My final table is about 220 gb. I know at google my I have quota for up 2 mega 
mem machines. Each one has some think like 1.4 Tb of memory

Kind regards

Andy


From: Sean Owen 
Date: Wednesday, April 20, 2022 at 5:34 PM
To: Andrew Davidson 
Cc: Andrew Melo , Bjørn Jørgensen 
, "user @spark" 
Subject: Re: How is union() implemented? Need to implement column bind

Wait, how is all that related to cbind -- very different from what's needed to 
insert.
BigQuery is unrelated to MR or Spark. It is however a SQL engine, but, can you 
express this in SQL without joins? I'm just guessing joining 10K+ tables is 
hard anywhere.

On Wed, Apr 20, 2022 at 7:32 PM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
I was thinking about something like bigQuery a little more. I do not know how 
it is implemented. However I believe traditional relational databases are row 
oriented and typically run on single machine. You can lock at the row level. 
This leads me to speculate that row level inserts maybe more efficient that the 
way spark implements union. One way to create my uber matrix would be to read 
the column vectors from the  10,114 individual files and insert them as rows in 
a table, then pivot the table.  I am going to poke around a bit. For all I know 
bigQuery use map reduce like spark.

Kind regards

Andy

From: Sean Owen mailto:sro...@gmail.com>>
Date: Wednesday, April 20, 2022 at 2:31 PM
To: Andrew Melo mailto:andrew.m...@gmail.com>>
Cc: Andrew Davidson mailto:aedav...@ucsc.edu>>, Bjørn 
Jørgensen mailto:bjornjorgen...@gmail.com>>, "user 
@spark" mailto:user@spark.apache.org>>
Subject: Re: How is union() implemented? Need to implement column bind

I don't think there's fundamental disapproval (it is implemented in sparklyr) 
just a question of how you make this work at scale in general. It's not a super 
natural operation in this context but can be done. If you find a successful 
solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo 
mailto:andrew.m...@gmail.com>> wrote:
It would certainly be useful for our domain to have some sort of native 
cbind(). Is there a fundamental disapproval of adding that functionality, or is 
it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen 
mailto:sro...@gmail.com>> wrote:
Good lead, pandas on Spark concat() is worth trying. It looks like it uses a 
join, but not 100% sure from the source.
Th

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Davidson
I was thinking about something like bigQuery a little more. I do not know how 
it is implemented. However I believe traditional relational databases are row 
oriented and typically run on single machine. You can lock at the row level. 
This leads me to speculate that row level inserts maybe more efficient that the 
way spark implements union. One way to create my uber matrix would be to read 
the column vectors from the  10,114 individual files and insert them as rows in 
a table, then pivot the table.  I am going to poke around a bit. For all I know 
bigQuery use map reduce like spark.

Kind regards

Andy

From: Sean Owen 
Date: Wednesday, April 20, 2022 at 2:31 PM
To: Andrew Melo 
Cc: Andrew Davidson , Bjørn Jørgensen 
, "user @spark" 
Subject: Re: How is union() implemented? Need to implement column bind

I don't think there's fundamental disapproval (it is implemented in sparklyr) 
just a question of how you make this work at scale in general. It's not a super 
natural operation in this context but can be done. If you find a successful 
solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo 
mailto:andrew.m...@gmail.com>> wrote:
It would certainly be useful for our domain to have some sort of native 
cbind(). Is there a fundamental disapproval of adding that functionality, or is 
it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen 
mailto:sro...@gmail.com>> wrote:
Good lead, pandas on Spark concat() is worth trying. It looks like it uses a 
join, but not 100% sure from the source.
The SQL concat() function is indeed a different thing.

On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
mailto:bjornjorgen...@gmail.com>> wrote:
Sorry for asking. But why does`t concat work?

Pandas on spark have 
ps.concat<https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
 which takes 2 dataframes and concat them to 1 dataframe.
It 
seems<https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
 like the pyspark version takes 2 columns and concat it to one column.

ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen 
mailto:sro...@gmail.com>>:
cbind? yeah though the answer is typically a join. I don't know if there's a 
better option in a SQL engine, as SQL doesn't have anything to offer except 
join and pivot either (? right?)
Certainly, the dominant data storage paradigm is wide tables, whereas you're 
starting with effectively a huge number of tiny slim tables, which is the 
impedance mismatch here.

On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
Thanks Sean

I imagine this is a fairly common problem in data science. Any idea how other 
solve?  For example I wonder if running join something like BigQuery might work 
better? I do not know much about the implementation.

No one tool will  solve all problems. Once I get the matrix I think it spark 
will work well for our need

Kind regards

Andy

From: Sean Owen mailto:sro...@gmail.com>>
Date: Monday, April 18, 2022 at 6:58 PM
To: Andrew Davidson mailto:aedav...@ucsc.edu>>
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: How is union() implemented? Need to implement column bind

A join is the natural answer, but this is a 10114-way join, which probably 
chokes readily just to even plan it, let alone all the shuffling and shuffling 
of huge data. You could tune your way out of it maybe, but not optimistic. It's 
just huge.

You could go off-road and lower-level to take advantage of the structure of the 
data. You effectively want "column bind". There is no such operation in Spark. 
(union is 'row bind'.) You could do this with zipPartition, which is in the RDD 
API, and to my surprise, not in the Python API but exists in Scala. And R (!). 
If you can read several RDDs of data, you can use this method to pair all their 
corresponding values and ultimately get rows of 10114 values out. In fact that 
is how sparklyr implements cbind on Spark, FWIW: 
https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to zip 
10114 of them. Perhaps you have to do that iteratively, and I don't know if 
that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows (maybe 
list all the file names, parallelize with Spark, write a UDF that reads the 
data for that file to generate the rows). If you can emit (file, index, value) 
and groupBy index, pivot on file (I think?) that should be about it? I think it 
doesn't need additional hashing or whatever. Not sure how fast it is but that 
seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson  
wrote:
Hi have a hard problem

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Davidson
Thanks Sean

I imagine this is a fairly common problem in data science. Any idea how other 
solve?  For example I wonder if running join something like BigQuery might work 
better? I do not know much about the implementation.

No one tool will  solve all problems. Once I get the matrix I think it spark 
will work well for our need

Kind regards

Andy

From: Sean Owen 
Date: Monday, April 18, 2022 at 6:58 PM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: How is union() implemented? Need to implement column bind

A join is the natural answer, but this is a 10114-way join, which probably 
chokes readily just to even plan it, let alone all the shuffling and shuffling 
of huge data. You could tune your way out of it maybe, but not optimistic. It's 
just huge.

You could go off-road and lower-level to take advantage of the structure of the 
data. You effectively want "column bind". There is no such operation in Spark. 
(union is 'row bind'.) You could do this with zipPartition, which is in the RDD 
API, and to my surprise, not in the Python API but exists in Scala. And R (!). 
If you can read several RDDs of data, you can use this method to pair all their 
corresponding values and ultimately get rows of 10114 values out. In fact that 
is how sparklyr implements cbind on Spark, FWIW: 
https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to zip 
10114 of them. Perhaps you have to do that iteratively, and I don't know if 
that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows (maybe 
list all the file names, parallelize with Spark, write a UDF that reads the 
data for that file to generate the rows). If you can emit (file, index, value) 
and groupBy index, pivot on file (I think?) that should be about it? I think it 
doesn't need additional hashing or whatever. Not sure how fast it is but that 
seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson  
wrote:
Hi have a hard problem

I have  10114 column vectors each in a separate file. The file has 2 columns, 
the row id, and numeric values. The row ids are identical and in sort order. 
All the column vectors have the same number of rows. There are over 5 million 
rows.  I need to combine them into a single table. The row ids are very long 
strings. The column names are about 20 chars long.

My current implementation uses join. This takes a long time on a cluster with 2 
works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally 
dead start over. Checkpoints do not seem  help, It still crashes and need to be 
restarted from scratch. What is really surprising is the final file size is 
only 213G ! The way got the file  was to copy all the column vectors to a 
single BIG IRON machine and used unix cut and paste. Took about 44 min to run 
once I got all the data moved around. It was very tedious and error prone. I 
had to move a lot data around. Not a particularly reproducible process. I will 
need to rerun this three more times on different data sets of about the same 
size

I noticed that spark has a union function(). It implements row bind. Any idea 
how it is implemented? Is it just map reduce under the covers?

My thought was

1.  load each col vector

2.  maybe I need to replace the really long row id strings with integers

3.  convert column vectors into row vectors using piviot (Ie matrix 
transpose.)

4.  union all the row vectors into a single table

5.  piviot the table back so I have the correct column vectors


I could replace the row ids and column name with integers if needed, and 
restore them later

Maybe I would be better off using many small machines? I assume memory is the 
limiting resource not cpu. I notice that memory usage will reach 100%. I added 
several TB’s of local ssd. I am not convinced that spark is using the local disk


will this perform better than join?


· The rows  before the final pivot will be very very wide (over 5 million 
columns)

· There will only be 10114 rows before the pivot

I assume the pivots will shuffle all the data. I assume the Colum vectors are 
trivial. The file table pivot will be expensive however will only need to be 
done once



Comments and suggestions appreciated

Andy




How is union() implemented? Need to implement column bind

2022-04-18 Thread Andrew Davidson
Hi have a hard problem

I have  10114 column vectors each in a separate file. The file has 2 columns, 
the row id, and numeric values. The row ids are identical and in sort order. 
All the column vectors have the same number of rows. There are over 5 million 
rows.  I need to combine them into a single table. The row ids are very long 
strings. The column names are about 20 chars long.

My current implementation uses join. This takes a long time on a cluster with 2 
works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally 
dead start over. Checkpoints do not seem  help, It still crashes and need to be 
restarted from scratch. What is really surprising is the final file size is 
only 213G ! The way got the file  was to copy all the column vectors to a 
single BIG IRON machine and used unix cut and paste. Took about 44 min to run 
once I got all the data moved around. It was very tedious and error prone. I 
had to move a lot data around. Not a particularly reproducible process. I will 
need to rerun this three more times on different data sets of about the same 
size

I noticed that spark has a union function(). It implements row bind. Any idea 
how it is implemented? Is it just map reduce under the covers?

My thought was

  1.  load each col vector
  2.  maybe I need to replace the really long row id strings with integers
  3.  convert column vectors into row vectors using piviot (Ie matrix 
transpose.)
  4.  union all the row vectors into a single table
  5.  piviot the table back so I have the correct column vectors


I could replace the row ids and column name with integers if needed, and 
restore them later

Maybe I would be better off using many small machines? I assume memory is the 
limiting resource not cpu. I notice that memory usage will reach 100%. I added 
several TB’s of local ssd. I am not convinced that spark is using the local disk


will this perform better than join?


  *   The rows  before the final pivot will be very very wide (over 5 million 
columns)
  *   There will only be 10114 rows before the pivot

I assume the pivots will shuffle all the data. I assume the Colum vectors are 
trivial. The file table pivot will be expensive however will only need to be 
done once



Comments and suggestions appreciated

Andy




Re: pivoting panda dataframe

2022-03-15 Thread Andrew Davidson
Many many thanks!

I have been looking for a pyspark data frame  column_bind() solution for 
several months. Hopefully pyspark.pandas  works. The only other solutions I was 
aware of was to use spark.dataframe.join(). This does not scale for obvious 
reason.

Andy


From: Bjørn Jørgensen 
Date: Tuesday, March 15, 2022 at 2:19 PM
To: Andrew Davidson 
Cc: Mich Talebzadeh , "user @spark" 

Subject: Re: pivoting panda dataframe

Hi Andrew. Mitch asked, and I answered transpose() 
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
 .

And now you are asking in the same thread about pandas API on spark and the 
transform().

Apache Spark have pandas API on Spark.

Which means that spark has an API call for pandas functions, and when you use 
pandas API on spark it is spark you are using then.

Add this line in yours import

from pyspark import pandas as ps


Now you can pass yours dataframe back and forward to pandas API on spark by 
using

pf01 = f01.to_pandas_on_spark()


f01 = pf01.to_spark()


Note that I have changed pd to ps here.

df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})

df.transform(lambda x: x + 1)

You will now see that all numbers are +1

You can find more information about pandas API on spark transform 
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
or in yours notbook
df.transform?


Signature:

df.transform(

func: Callable[..., ForwardRef('Series')],

axis: Union[int, str] = 0,

*args: Any,

**kwargs: Any,

) -> 'DataFrame'

Docstring:

Call ``func`` on self producing a Series with transformed values

and that has the same length as its input.



See also `Transform and apply a function

<https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_<https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html%3E%60_>.



.. note:: this API executes the function once to infer the type which is

 potentially expensive, for instance, when the dataset is created after

 aggregations or sorting.



 To avoid this, specify return type in ``func``, for instance, as below:



 >>> def square(x) -> ps.Series[np.int32]:

 ... return x ** 2



 pandas-on-Spark uses return type hint and does not try to infer the type.



.. note:: the series within ``func`` is actually multiple pandas series as the

segments of the whole pandas-on-Spark series; therefore, the length of each 
series

is not guaranteed. As an example, an aggregation against each series

does work as a global aggregation but an aggregation of each segment. See

below:



>>> def func(x) -> ps.Series[np.int32]:

... return x + sum(x)



Parameters

--

func : function

Function to use for transforming the data. It must work when pandas Series

is passed.

axis : int, default 0 or 'index'

Can only be set to 0 at the moment.

*args

Positional arguments to pass to func.

**kwargs

Keyword arguments to pass to func.



Returns

---

DataFrame

A DataFrame that must have the same length as self.



Raises

--

Exception : If the returned DataFrame has a different length than self.



See Also



DataFrame.aggregate : Only perform aggregating type operations.

DataFrame.apply : Invoke function on DataFrame.

Series.transform : The equivalent function for Series.



Examples



>>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 'B'])

>>> df

   A  B

0  0  1

1  1  2

2  2  3



>>> def square(x) -> ps.Series[np.int32]:

... return x ** 2

>>> df.transform(square)

   A  B

0  0  1

1  1  4

2  4  9



You can omit the type hint and let pandas-on-Spark infer its type.



>>> df.transform(lambda x: x ** 2)

   A  B

0  0  1

1  1  4

2  4  9



For multi-index columns:



>>> df.columns = [('X', 'A'), ('X', 'B')]

>>> df.transform(square)  # doctest: +NORMALIZE_WHITESPACE

   X

   A  B

0  0  1

1  1  4

2  4  9



>>> (df * -1).transform(abs)  # doctest: +NORMALIZE_WHITESPACE

   X

   A  B

0  0  1

1  1  2

2  2  3



You can also specify extra arguments.



>>> def calculation(x, y, z) -> ps.Series[int]:

... return x ** y + z

>>> df.transform(calculation, y=10, z=20)  # doctest: +NORMALIZE_WHITESPACE

  X

  A  B

020 21

1    21   1044

2  1044  59069

File:  /opt/spark/python/pyspark/pandas/frame.py

Type:  method




tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson 
mailto:aedav...@ucsc.edu>>:
Hi Bjorn

I have been looking for spark transform for a while. Can you send me a link to 
the pyspark function?

I assume pandas transform is not really an option

Re: pivoting panda dataframe

2022-03-15 Thread Andrew Davidson
Hi Bjorn

I have been looking for spark transform for a while. Can you send me a link to 
the pyspark function?

I assume pandas transform is not really an option. I think it will try to pull 
the entire dataframe into the drivers memory.

Kind regards

Andy

p.s. My real problem is that spark does not allow you to bind columns. You can 
use union() to bind rows. I could get the equivalent of cbind() using 
union().transform()

From: Bjørn Jørgensen 
Date: Tuesday, March 15, 2022 at 10:37 AM
To: Mich Talebzadeh 
Cc: "user @spark" 
Subject: Re: pivoting panda dataframe

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html we 
have that transpose in pandas api for spark to.

You also have stack() and multilevel 
https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html



tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>:


hi,



Is it possible to pivot a panda dataframe by making the row column heading?



thanks




 [Image removed by sender.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




--
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Does spark have something like rowsum() in R?

2022-02-09 Thread Andrew Davidson
Hi Sean

I have 2 big for loops in my code. One for loop uses join to implement R’s 
cbind() the other implements R’s  rowsum(). Each for loop iterates 10411 times.

It debug I added an action to each iteration and of the loop. I think I used 
count() and logged the results.  So I am confident this is where the problem is.

In my experience you need to be really careful anytime you use for loops in big 
data. There is a potential loss of computation efficiency. The idea of spark’s 
lazy evaluation and optimization is very appealing

Andy


From: Sean Owen 
Date: Wednesday, February 9, 2022 at 8:19 AM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: Does spark have something like rowsum() in R?

It really depends on what is running out of memory. You can have all the 
workers in the world but if something is blowing up the driver, won't do 
anything. You can have a huge cluster but data skew makes it impossible to 
break up the problem you express. Spark running out of mem is not the same as R 
running out of mem.

You can definitely do this faster with Spark with enough parallelism. It can be 
harder to reason about a distributed system for sure. WIthout a lot more 
detail, hard to say 'why'. For example, it's not clear that the operation you 
pasted fails. Did you collect huge results to the driver afterwards? etc


On Wed, Feb 9, 2022 at 10:10 AM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
Hi Sean

Debugging big data projects is always hard. It is a black art that takes a lot 
of experience.

Can you tell me more about “Why you're running out of mem is probably more a 
function of your parallelism, cluster size” ?

I have cluster with 2 worker nodes. Each with 1.4 TB of memory , 96 vcpus, and 
as much ssd as I want. It was really hard to get quota for these machines on 
GCP. Would I be better with dozens of smaller machines?

This has been an incredibly hard problem to debug. What I wound up doing is 
just using spark to select the column of interest and write these columns to 
individual part files.

Next I used a special research computer at my university with 64 cores and a 1 
TB of memory. I copied the part files from gcp to the computer.  I used the 
UNIX paste command to create a single table. Finally I am doing all my analysis 
on a single machine using R. paste took about 40 min. Spark would crash after 
about 12 hrs.

column bind and row sums are common operations. Seem like there should be an 
easy solution? Maybe I should submit a RFE (request for enhancement)

Kind regards

Andy

From: Sean Owen mailto:sro...@gmail.com>>
Date: Tuesday, February 8, 2022 at 8:57 AM
To: Andrew Davidson mailto:aedav...@ucsc.edu>>
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: Does spark have something like rowsum() in R?

That seems like a fine way to do it. Why you're running out of mem is probably 
more a function of your parallelism, cluster size, and the fact that R is a 
memory hog.
I'm not sure there are great alternatives in R and Spark; in other languages 
you might more directly get the array of (numeric?) row value and sum them 
efficiently. Certainly pandas UDFs would make short work of that.

On Tue, Feb 8, 2022 at 10:02 AM Andrew Davidson  
wrote:
As part of my data normalization process I need to calculate row sums. The 
following code works on smaller test data sets. It does not work on my big 
tables. When I run on a table with over 10,000 columns I get an OOM on a 
cluster with 2.8 TB. Is there a better way to implement this

Kind regards

Andy

https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/rowsum
“Compute column sums across rows of a numeric matrix-like object for each level 
of a grouping variable. “



###

def rowSums( self, countsSparkDF, newColName, columnNames ):

'''

calculates actual sum of columns



arguments

countSparkDF



newColumName:

results from column sum will be sorted here



columnNames:

list of columns to sum



returns

amended countSparkDF

'''

self.logger.warn( "rowSumsImpl BEGIN" )



# https://stackoverflow.com/a/54283997/4586180

retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( 
add, [col( x ) for x in columnNames] ) )



# self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\

#  .format( retDF.count(), len( retDF.columns ) ) )

#

# self.logger.warn("AEDWIP remove show")

# retDF.show()



self.logger.warn( "rowSumsImpl END\n" )

return retDF




Re: Does spark have something like rowsum() in R?

2022-02-09 Thread Andrew Davidson
Hi Sean

Debugging big data projects is always hard. It is a black art that takes a lot 
of experience.

Can you tell me more about “Why you're running out of mem is probably more a 
function of your parallelism, cluster size” ?

I have cluster with 2 worker nodes. Each with 1.4 TB of memory , 96 vcpus, and 
as much ssd as I want. It was really hard to get quota for these machines on 
GCP. Would I be better with dozens of smaller machines?

This has been an incredibly hard problem to debug. What I wound up doing is 
just using spark to select the column of interest and write these columns to 
individual part files.

Next I used a special research computer at my university with 64 cores and a 1 
TB of memory. I copied the part files from gcp to the computer.  I used the 
UNIX paste command to create a single table. Finally I am doing all my analysis 
on a single machine using R. paste took about 40 min. Spark would crash after 
about 12 hrs.

column bind and row sums are common operations. Seem like there should be an 
easy solution? Maybe I should submit a RFE (request for enhancement)

Kind regards

Andy

From: Sean Owen 
Date: Tuesday, February 8, 2022 at 8:57 AM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: Does spark have something like rowsum() in R?

That seems like a fine way to do it. Why you're running out of mem is probably 
more a function of your parallelism, cluster size, and the fact that R is a 
memory hog.
I'm not sure there are great alternatives in R and Spark; in other languages 
you might more directly get the array of (numeric?) row value and sum them 
efficiently. Certainly pandas UDFs would make short work of that.

On Tue, Feb 8, 2022 at 10:02 AM Andrew Davidson  
wrote:
As part of my data normalization process I need to calculate row sums. The 
following code works on smaller test data sets. It does not work on my big 
tables. When I run on a table with over 10,000 columns I get an OOM on a 
cluster with 2.8 TB. Is there a better way to implement this

Kind regards

Andy

https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/rowsum
“Compute column sums across rows of a numeric matrix-like object for each level 
of a grouping variable. “



###

def rowSums( self, countsSparkDF, newColName, columnNames ):

'''

calculates actual sum of columns



arguments

countSparkDF



newColumName:

results from column sum will be sorted here



columnNames:

list of columns to sum



returns

amended countSparkDF

'''

self.logger.warn( "rowSumsImpl BEGIN" )



# https://stackoverflow.com/a/54283997/4586180

retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( 
add, [col( x ) for x in columnNames] ) )



# self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\

#  .format( retDF.count(), len( retDF.columns ) ) )

#

# self.logger.warn("AEDWIP remove show")

# retDF.show()



self.logger.warn( "rowSumsImpl END\n" )

return retDF




Does spark have something like rowsum() in R?

2022-02-08 Thread Andrew Davidson
As part of my data normalization process I need to calculate row sums. The 
following code works on smaller test data sets. It does not work on my big 
tables. When I run on a table with over 10,000 columns I get an OOM on a 
cluster with 2.8 TB. Is there a better way to implement this

Kind regards

Andy

https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/rowsum
“Compute column sums across rows of a numeric matrix-like object for each level 
of a grouping variable. “



###

def rowSums( self, countsSparkDF, newColName, columnNames ):

'''

calculates actual sum of columns



arguments

countSparkDF



newColumName:

results from column sum will be sorted here



columnNames:

list of columns to sum



returns

amended countSparkDF

'''

self.logger.warn( "rowSumsImpl BEGIN" )



# https://stackoverflow.com/a/54283997/4586180

retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( 
add, [col( x ) for x in columnNames] ) )



# self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\

#  .format( retDF.count(), len( retDF.columns ) ) )

#

# self.logger.warn("AEDWIP remove show")

# retDF.show()



self.logger.warn( "rowSumsImpl END\n" )

return retDF




Does spark support something like the bind function in R?

2022-02-08 Thread Andrew Davidson
I need to create a single table by selecting one column from thousands of 
files. The columns are all of the same type, have the same number of rows and 
rows names. I am currently using join. I get OOM on mega-mem cluster with 2.8 
TB.

Does spark have something like cbind() “Take a sequence of vector, matrix or 
data-frame arguments and combine by columns or rows, respectively. “

https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/cbind

Digging through the spark documentation I found a udf example
https://spark.apache.org/docs/latest/sparkr.html#dapply

```
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", 
"double"),
 structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1 3.600  79 4740
##2 1.800  54 3240
##3 3.333  74 4440
##4 2.283  62 3720
##5 4.533  85 5100
##6 2.883  55 3300
```

I wonder if this is just a wrapper around join? If so it is probably not going 
to help me out.

Also I would prefer to work in python

Any thoughts?

Kind regards

Andy




Re: What are your experiences using google cloud platform

2022-01-24 Thread Andrew Davidson
I think my problem has to do with mega-mem machine. It was hard to get quota 
for mega-mem machines.  I wonder if they are unstable? Any suggestions for how 
I look at the ‘hardware’?

I ran the same job several times. They all failed in different ways. Once 
looked like sort of networking problem accessing gcp buckets

Several times it looked like my jobs fail when I call df.checkpoint() basically 
no progress in my driver log files after 30 mins.  Cpu utilization crashes from 
60 % to  almost zero. I terminated the jobs.

One time the checkpoint seemed to hang after doing a series of narrow 
transformations on a single data frame

Most of the time the checkpoint seem to fail while calculate rowSums, I have 
reworked the rowSum code several times. See bellow for final versoin

Based on google searches it seem like in gcp dataproc, people set the 
checkpointdir to be something like gs://myBucket/checkpoint/

I see the cluster has a lot of HDFSstorage. As my job runs memory utilization 
== 100%. My cluster has 2.8 Tb of memory. Spark will eventually start  writing 
something to HDFS. As a newbie I would think we would want to set the 
checkpointdir to HDFS. I do not think HDFS is the limiting resource. It never 
seems to be fully exhausted. I did a lot of googling and was unable find an 
HDFS example URL. The checkpoints() are really slow. Takes twice as long as 
when I call cache().

Comments and suggestions appreciated

Andy

###
def rowSums( self, countsSparkDF, columnNames, columnBatchSize ):
'''
The GTEx training data set has 10409 numeric columns. This cause a
java.lang.StackOverflowError because the DAG is to big. increasing 
spark driver
memory does not help. The work around is sum  smaller batches of columns
and cache the results of each batch
'''
self.logger.warn("rowSums BEGIN")
totalColName = "rowSum"
for i in range(0, len(columnNames), columnBatchSize) :
tmpColName = "tmpSum" + str(i)
batch = columnNames[i:i+columnBatchSize]
countsSparkDF = self.rowSumsImpl(countsSparkDF, tmpColName, batch)

if i == 0:
countsSparkDF = countsSparkDF.withColumnRenamed(tmpColName, 
totalColName)

else:
# calculate rolling total
countsSparkDF = countsSparkDF.withColumn(totalColName, 
col(totalColName) + col(tmpColName))
# save space
countsSparkDF = countsSparkDF.drop(tmpColName )

# use an action to force execution
numRows = countsSparkDF.count()
self.logger.warn("rowSums:batch:{} numRows:{}".format(i, numRows))

# check point will save the df data but not its linage
#countsSparkDF.cache()
countsSparkDF.checkpoint()

self.logger.warn("rowSums END")
return countsSparkDF


###
def rowSumsImpl( self, countsSparkDF, newColName, columnNames ):
'''
calculates actual sum of columns

arguments
countSparkDF

newColumName:
results from column sum will be sorted here

columnNames:
list of columns to sum

returns
amended countSparkDF
'''
self.logger.warn( "rowSumsImpl BEGIN" )

# https://stackoverflow.com/a/54283997/4586180
retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( 
add, [col( x ) for x in columnNames] ) )

self.logger.warn( "rowSumsImpl END\n" )
return retDF



From: Mich Talebzadeh 
Date: Monday, January 24, 2022 at 12:54 AM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: What are your experiences using google cloud platform

Dataproc works fine. The current version is Spark 3.1.2. Look at your code,  
hardware and scaling.



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Sun, 23 Jan 2022 at 21:19, Andrew Davidson  wrote:
Hi recently started using GCP dataproc spark.

Seem to have trouble getting big jobs to complete. I am using check points. I 
am wondering if maybe I should look for another cloud solution

Kind regards

Andy


What are your experiences using google cloud platform

2022-01-23 Thread Andrew Davidson
Hi recently started using GCP dataproc spark.

Seem to have trouble getting big jobs to complete. I am using check points. I 
am wondering if maybe I should look for another cloud solution

Kind regards

Andy


Re: How to configure log4j in pyspark to get log level, file name, and line number

2022-01-21 Thread Andrew Davidson
Interesting. I noticed that my drive log messages with time stamp, function 
name but no line number. However log message in other python files only contain 
the messages. All of my python code is a single zip file. The zip file is job 
submit argument

2022-01-21 19:45:02 WARN  __main__:? - sparkConfig: ('spark.sql.cbo.enabled', 
'true')
2022-01-21 19:48:34 WARN  __main__:? - readsSparkDF.rdd.getNumPartitions():1698
__init__ BEGIN
__init__ END
run BEGIN
run rawCountsSparkDF numRows:5387495 numCols:10409

My guess is somehow I need to change the way log4j is configure on the workers?

Kind regards

Andy

From: Andrew Davidson 
Date: Thursday, January 20, 2022 at 2:32 PM
To: "user @spark" 
Subject: How to configure log4j in pyspark to get log level, file name, and 
line number

Hi

When I use python logging for my unit test. I am able to control the output 
format. I get the log level, the file and line number, then the msg

[INFO testEstimatedScalingFactors.py:166 - test_B_convertCountsToInts()] BEGIN

In my spark driver I am able to get the log4j logger

spark = SparkSession\
.builder\
.appName("estimatedScalingFactors")\
.getOrCreate()

#
# 
https://medium.com/@lubna_22592/building-production-pyspark-jobs-5480d03fd71e
# initialize  logger for yarn cluster logs
#
log4jLogger = spark.sparkContext._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)

However it only outputs the message. As a hack I have been adding the function 
names to the msg.



I wonder if this is because of the way I make my python code available. When I 
submit my job using



‘$ gcloud dataproc jobs submit pyspark’



I pass my python file in a zip file
 --py-files ${extraPkg}

I use level warn because the driver info logs are very verbose


###

def rowSums( self, countsSparkDF, columnNames ):

self.logger.warn( "rowSums BEGIN" )



# https://stackoverflow.com/a/54283997/4586180

retDF = countsSparkDF.na.fill( 0 ).withColumn( "rowSum" , reduce( add, 
[col( x ) for x in columnNames] ) )



self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\

 .format( retDF.count(), len( retDF.columns ) ) )



self.logger.warn( "rowSums END\n" )

return retDF

kind regards

Andy


Is user@spark indexed by google?

2022-01-21 Thread Andrew Davidson
There is a ton of great info in this archive. I noticed when I do a google 
search it does not seem to find results from this source

Kind regards

Andy


How to configure log4j in pyspark to get log level, file name, and line number

2022-01-20 Thread Andrew Davidson
Hi

When I use python logging for my unit test. I am able to control the output 
format. I get the log level, the file and line number, then the msg

[INFO testEstimatedScalingFactors.py:166 - test_B_convertCountsToInts()] BEGIN

In my spark driver I am able to get the log4j logger

spark = SparkSession\
.builder\
.appName("estimatedScalingFactors")\
.getOrCreate()

#
# 
https://medium.com/@lubna_22592/building-production-pyspark-jobs-5480d03fd71e
# initialize  logger for yarn cluster logs
#
log4jLogger = spark.sparkContext._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)

However it only outputs the message. As a hack I have been adding the function 
names to the msg.



I wonder if this is because of the way I make my python code available. When I 
submit my job using



‘$ gcloud dataproc jobs submit pyspark’



I pass my python file in a zip file
 --py-files ${extraPkg}

I use level warn because the driver info logs are very verbose


###

def rowSums( self, countsSparkDF, columnNames ):

self.logger.warn( "rowSums BEGIN" )



# https://stackoverflow.com/a/54283997/4586180

retDF = countsSparkDF.na.fill( 0 ).withColumn( "rowSum" , reduce( add, 
[col( x ) for x in columnNames] ) )



self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\

 .format( retDF.count(), len( retDF.columns ) ) )



self.logger.warn( "rowSums END\n" )

return retDF

kind regards

Andy


java.lang.StackOverflow Error How to sum across rows in a data frame with a large number of columns

2022-01-20 Thread Andrew Davidson
Hi

I have a dataframe of integers. It has 10409 columns. How can I sum across each 
row?

I get a very long stack trace

rowSums BEGIN
2022-01-20 22:11:24 ERROR __main__:? - An error occurred while calling 
o93935.withColumn.
: java.lang.StackOverflowError
at 
scala.collection.immutable.Set$SetBuilderImpl.$plus$eq(Set.scala:349)
at 
scala.collection.immutable.Set$SetBuilderImpl.$plus$eq(Set.scala:329)


###

def rowSums( self, countsSparkDF, columnNames ):

self.logger.warn( "rowSums BEGIN" )



# https://stackoverflow.com/a/54283997/4586180

retDF = countsSparkDF.na.fill( 0 ).withColumn( "rowSum" , reduce( add, 
[col( x ) for x in columnNames] ) )



self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\

 .format( retDF.count(), len( retDF.columns ) ) )



self.logger.warn( "rowSums END\n" )

return retDF



Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Andrew Davidson
Thanks!

I will take a look

Andy

From: Gourav Sengupta 
Date: Tuesday, January 11, 2022 at 8:42 AM
To: Andrew Davidson 
Cc: Andrew Davidson , "user @spark" 

Subject: Re: How to add a row number column with out reordering my data frame

Hi,
I do not think we need to do any of that. Please try repartitionbyrange, dpark 
3 has adaptive query execution with configurations to handle skew as well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into 
small batches of rows. And then join the batch together then use union. My 
assumption is the union is a narrow transform and as such require fewer 
resources. Let say I have 5 data frames I want to join together and each has 
300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns 
out I need to call orderBy on every iteration of the for loop. I tried sorting 
outside of the while loop, did not resolve problem Given the size of my 
dataframes that is going crush performance. My unit test works. I never ran it 
on my real data set.

# Create a copy of original dataframe
copyDF = df.orderBy("Name")
# copyDF.show()

i = 0
while i < numberOfSplits:
self.logger.warn("i:{}".format(i))
# Get the top `numRows` number of rows
# note take() is an action
# limit() is a transformation
topDF = copyDF.limit( numRows )

# Truncate the `copy_df` to remove
# the contents fetched for `temp_df`
# original quant.sf files are sorted by name however
# we must use order by, else the row names between
# GTEx sample will not be the same
# we can not simply sort or orderBy once. we have to
# do this on every iteration
copyDF = copyDF.subtract(topDF).orderBy( "Name" )

retList[i] = topDF

# Increment the split number
i += 1

if remainingRows > 0 :
self.logger.info<http://self.logger.info>("AEDWIP writing last i:{} 
len(retList):{}".format(i, len(retList)))
retList[i] = copyDF
#copyDF.show()
#retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add 
a row number I could easily split up mydata frames. My code would look a lot 
like what I would write in pandas or R

while i < numBatches:
start = i * numRows
end = start + numRows
print("\ni:{} start:{} end:{}".format(i, start,end))
df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but 
not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson 
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating 
the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson  
wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a 

Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Andrew Davidson
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into 
small batches of rows. And then join the batch together then use union. My 
assumption is the union is a narrow transform and as such require fewer 
resources. Let say I have 5 data frames I want to join together and each has 
300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns 
out I need to call orderBy on every iteration of the for loop. I tried sorting 
outside of the while loop, did not resolve problem Given the size of my 
dataframes that is going crush performance. My unit test works. I never ran it 
on my real data set.

# Create a copy of original dataframe
copyDF = df.orderBy("Name")
# copyDF.show()

i = 0
while i < numberOfSplits:
self.logger.warn("i:{}".format(i))
# Get the top `numRows` number of rows
# note take() is an action
# limit() is a transformation
topDF = copyDF.limit( numRows )

# Truncate the `copy_df` to remove
# the contents fetched for `temp_df`
# original quant.sf files are sorted by name however
# we must use order by, else the row names between
# GTEx sample will not be the same
# we can not simply sort or orderBy once. we have to
# do this on every iteration
copyDF = copyDF.subtract(topDF).orderBy( "Name" )

retList[i] = topDF

# Increment the split number
i += 1

if remainingRows > 0 :
self.logger.info("AEDWIP writing last i:{} 
len(retList):{}".format(i, len(retList)))
retList[i] = copyDF
#copyDF.show()
#retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add 
a row number I could easily split up mydata frames. My code would look a lot 
like what I would write in pandas or R

while i < numBatches:
start = i * numRows
end = start + numRows
print("\ni:{} start:{} end:{}".format(i, start,end))
df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but 
not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta 
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating 
the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson  
wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The 
order is important

Kind regards

Andy


How to add a row number column with out reordering my data frame

2022-01-06 Thread Andrew Davidson
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The 
order is important

Kind regards

Andy


Re: Newbie pyspark memory mgmt question

2022-01-05 Thread Andrew Davidson

Thanks Sean

Andy

From: Sean Owen 
Date: Wednesday, January 5, 2022 at 3:38 PM
To: Andrew Davidson , Nicholas Gustafson 

Cc: "user @spark" 
Subject: Re: Newbie pyspark memory mgmt question

There is no memory leak, no. You can .cache() or .persist() DataFrames, and 
that can use memory until you .unpersist(), but you're not doing that and they 
are garbage collected anyway.
Hard to say what's running out of memory without knowing more about your data 
size, partitions, cluster size, etc

On Wed, Jan 5, 2022 at 5:27 PM Andrew Davidson  
wrote:
Hi

I am running into OOM problems. My cluster should be much bigger than I need. I 
wonder if it has to do with the way I am writing my code. Below are three style 
cases. I wonder if they cause memory to be leaked?

Case 1 :

df1 = spark.read.load( cvs file)

df1 = df1.someTransform()

df1 = df1.sometranform()

df1.write(csv file)



I assume lazy evaluation. First action is write. So does not  leak memory



Case 2.

I added actions to make it easier to debug



df1 = spark.read.load( cvs file)

print( df.count() )



df1 = df1.someTransform()

print( df.count() )



df1 = df1.sometranform()

print( df.count() )



df1.write(csv file)

Does this leak memory?

Case 3.
If you remove the debug actions. You have the original version of my code.

For f in listOfFiles

df1 = spark.read.load( cvs file)

df1  = df.select( [“a”, “b”] )

print( df1.count() )

df1.createOrReplaceTempView( "df1" )



from \n\
   retDF as rc, \n\
   sample  \n\
where \n\
rc.Name == df1.Name \n'.format(“a”)
 if i == 0 :
retDF = df1
else :
retDF = self.spark.sql( sqlStmt )

   print( retDF.count() )
   retDF.createOrReplaceTempView( "retDF" )


Does this leak memory? Is there some sort of destroy(), delete(), ??? function 
I should be calling ?

I wonder if I would be better off using the dataframe version of join() ?

Kind regards

Andy



Newbie pyspark memory mgmt question

2022-01-05 Thread Andrew Davidson
Hi

I am running into OOM problems. My cluster should be much bigger than I need. I 
wonder if it has to do with the way I am writing my code. Below are three style 
cases. I wonder if they cause memory to be leaked?

Case 1 :

df1 = spark.read.load( cvs file)

df1 = df1.someTransform()

df1 = df1.sometranform()

df1.write(csv file)



I assume lazy evaluation. First action is write. So does not  leak memory



Case 2.

I added actions to make it easier to debug



df1 = spark.read.load( cvs file)

print( df.count() )



df1 = df1.someTransform()

print( df.count() )



df1 = df1.sometranform()

print( df.count() )



df1.write(csv file)

Does this leak memory?

Case 3.
If you remove the debug actions. You have the original version of my code.

For f in listOfFiles

df1 = spark.read.load( cvs file)

df1  = df.select( [“a”, “b”] )

print( df1.count() )

df1.createOrReplaceTempView( "df1" )



from \n\
   retDF as rc, \n\
   sample  \n\
where \n\
rc.Name == df1.Name \n'.format(“a”)
 if i == 0 :
retDF = df1
else :
retDF = self.spark.sql( sqlStmt )

   print( retDF.count() )
   retDF.createOrReplaceTempView( "retDF" )


Does this leak memory? Is there some sort of destroy(), delete(), ??? function 
I should be calling ?

I wonder if I would be better off using the dataframe version of join() ?

Kind regards

Andy



Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Andrew Davidson
Hi David

I need to select 1 column from many files and combine them into a single table.

I do not believe union() will work. It appends rows, not columns.

As far as I know join() is the only way to append columns from different data 
frames.

I think you correct that using lazy evaluation over a lot of joins may make the 
execution plan to complicated. To debug I added

logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), 
retDF.count()  )

to try and simplify the execution plan.


Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making 
some progress how ever fails after a few files. Resources are maxed out!



I estimated that that the raw data should be < 500 GB. I am running a cluster 
with 2.8 TB that should be more than enough to spark over head



Is spark integrated with the python garbage collector?



I assume createOrReplaceTempView() would cause cache to get flushed as needed?



Kind regards



Andy



###
def _loadSalmonReadsTable(self):
'''
AEDWIP TODO
'''
self.logger.info( "BEGIN" )
retNumReadsDF = None
quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, 
`TPM` DOUBLE, `NumReads` DOUBLE "
for i in range( len(self.fileList) ):
#
# get NumReads from next salmon quant file
#
quantFile = self.fileList[i]
sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t",
 schema=quantSchema, header="true" )
# did not fix bug .repartition(50)

sampleName = self.sampleNamesList[i]
sampleDF = sampleDF.select( ["Name", "NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )

sampleDF.createOrReplaceTempView( "sample" )

self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} 
num cols:{} num parts:{}"
 .format(i, sampleName, sampleDF.count(), 
len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))

#
# append NumReads to table of reads
#

# the sample name must be quoted else column names with a '-'
# like 1117F-0426-SM-5EGHI will generate an error
# spark think the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
   retNumReadsDF as rc, \n\
   sample  \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )

self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
if i == 0 :
retNumReadsDF = sampleDF
else :
retNumReadsDF = self.spark.sql( sqlStmt )

retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )

#
# debug. seems like we do not make progress when we run on training
# nothing happens, logs do not change, cluster metrics drop 
suggesting no work
# is being done
# add an action to try and debug
# this should not change the physical plan. I.e. we still have the 
same number of shuffles
# which results in the same number of stage. We are just not 
building up a plan with thousands
# of stages.
#
self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} 
num parts:{}"
 .format(i, retNumReadsDF.count(), 
len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )

#
# TODO AEDWIP spark analyze chapter 18 debugging joins

# execution plan should be the same for each join
#rawCountsSDF.explain()

    self.logger.info( "END\n" )
return retNumReadsDF


From: David Diebold 
Date: Monday, January 3, 2022 at 12:39 AM
To: Andrew Davidson , "user @spark" 

Subject: Re: Pyspark debugging best practices

Hello Andy,

Are you sure you want to perform lots of join operations, and not simple unions 
?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each individual 
dataset size ?
Have a look at execution plan would help, maybe the high amount of join 
operations makes execution plan too complicated at the end of the day ; 
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a 
écrit :
Hi Gourav

I will give databricks a try

Re: Pyspark debugging best practices

2021-12-30 Thread Andrew Davidson
Hi Gourav

I will give databricks a try.

Each data gets loaded into a data frame.
I select one column from the data frame
I join the column to the  accumulated joins from previous data frames in
the list

To debug. I think am gaining to put an action and log statement after each
join. I do not think it will change the performance. I believe the physical
plan will be the same how ever hopefully it will shed some light.

At the very least I will know if it making progress or not. And hopefully
where it is breaking

Happy new year

Andy

On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
wrote:

> Hi Andrew,
>
> Any chance you might give Databricks a try in GCP?
>
> The above transformations look complicated to me, why are you adding
> dataframes to a list?
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
> wrote:
>
>> Hi
>>
>>
>>
>> I am having trouble debugging my driver. It runs correctly on smaller
>> data set but fails on large ones.  It is very hard to figure out what the
>> bug is. I suspect it may have something do with the way spark is installed
>> and configured. I am using google cloud platform dataproc pyspark
>>
>>
>>
>> The log messages are not helpful. The error message will be something
>> like
>> "User application exited with status 1"
>>
>>
>>
>> And
>>
>>
>>
>> jsonPayload: {
>>
>> class: "server.TThreadPoolServer"
>>
>> filename: "hive-server2.log"
>>
>> message: "Error occurred during processing of message."
>>
>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>
>> }
>>
>>
>>
>> I am able to access the spark history server however it does not capture
>> anything if the driver crashes. I am unable to figure out how to access
>> spark web UI.
>>
>>
>>
>> My driver program looks something like the pseudo code bellow. A long
>> list of transforms with a single action, (i.e. write) at the end. Adding
>> log messages is not helpful because of lazy evaluations. I am tempted to
>> add something like
>>
>>
>>
>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>> inline some sort of diagnostic message.
>>
>>
>>
>> What do you think?
>>
>>
>>
>> Is there a better way to debug this?
>>
>>
>>
>> Kind regards
>>
>>
>>
>> Andy
>>
>>
>>
>> def run():
>>
>> listOfDF = []
>>
>> for filePath in listOfFiles:
>>
>> df = spark.read.load( filePath, ...)
>>
>> listOfDF.append(df)
>>
>>
>>
>>
>>
>> list2OfDF = []
>>
>> for df in listOfDF:
>>
>> df2 = df.select(  )
>>
>> lsit2OfDF.append( df2 )
>>
>>
>>
>> # will setting list to None free cache?
>>
>> # or just driver memory
>>
>> listOfDF = None
>>
>>
>>
>>
>>
>> df3 = list2OfDF[0]
>>
>>
>>
>> for i in range( 1, len(list2OfDF) ):
>>
>> df = list2OfDF[i]
>>
>> df3 = df3.join(df ...)
>>
>>
>>
>> # will setting to list to None free cache?
>>
>> # or just driver memory
>>
>> List2OfDF = None
>>
>>
>>
>>
>>
>> lots of narrow transformations on d3
>>
>>
>>
>> return df3
>>
>>
>>
>> def main() :
>>
>> df = run()
>>
>> df.write()
>>
>>
>>
>>
>>
>>
>>
>


Pyspark debugging best practices

2021-12-26 Thread Andrew Davidson
Hi

I am having trouble debugging my driver. It runs correctly on smaller data set 
but fails on large ones.  It is very hard to figure out what the bug is. I 
suspect it may have something do with the way spark is installed and 
configured. I am using google cloud platform dataproc pyspark

The log messages are not helpful. The error message will be something like
"User application exited with status 1"

And

jsonPayload: {
class: "server.TThreadPoolServer"
filename: "hive-server2.log"
message: "Error occurred during processing of message."
thread: "HiveServer2-Handler-Pool: Thread-40"
}

I am able to access the spark history server however it does not capture 
anything if the driver crashes. I am unable to figure out how to access spark 
web UI.

My driver program looks something like the pseudo code bellow. A long list of 
transforms with a single action, (i.e. write) at the end. Adding log messages 
is not helpful because of lazy evaluations. I am tempted to add something like

Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline some 
sort of diagnostic message.

What do you think?

Is there a better way to debug this?

Kind regards

Andy

def run():
listOfDF = []
for filePath in listOfFiles:
df = spark.read.load( filePath, ...)
listOfDF.append(df)


list2OfDF = []
for df in listOfDF:
df2 = df.select(  )
lsit2OfDF.append( df2 )

# will setting list to None free cache?
# or just driver memory
listOfDF = None


df3 = list2OfDF[0]

for i in range( 1, len(list2OfDF) ):
df = list2OfDF[i]
df3 = df3.join(df ...)

# will setting to list to None free cache?
# or just driver memory
List2OfDF = None


lots of narrow transformations on d3

return df3

def main() :
df = run()
df.write()





Pyspark garbage collection and cache management best practices

2021-12-26 Thread Andrew Davidson
Hi

Below is typical pseudo code I find myself writing over and over again. There 
is only a single action at the very end of the program. The early narrow 
transformations potentially hold on to a lot of needless data. I have a for 
loop over join. (ie wide transformation). Followed by a bunch more narrow 
transformations. Will setting my lists to None improve performance?

What are best practices?

Kind regards

Andy

def run():
listOfDF = []
for filePath in listOfFiles:
df = spark.read.load( filePath, ...)
listOfDF.append(df)


list2OfDF = []
for df in listOfDF:
df2 = df.select(  )
lsit2OfDF.append( df2 )

# will setting to list to None free cache?
# or just driver memory
listOfDF = None


df3 = list2OfDF[0]

for i in range( 1, len(list2OfDF) ):
df = list2OfDF[i]
df3 = df3.join(df ...)

# will setting to list to None free cache?
# or just driver memory
List2OfDF = None


lots of narrow transformations on d3

return df3

def main() :
df = run()
df.write()


Re: About some Spark technical help

2021-12-24 Thread Andrew Davidson
Hi Sam

It is kind of hard to review straight code. Adding some some sample data, a
unit test and expected results. Would be a good place to start. Ie.
Determine the fidelity of your implementation compared to the original.

Also a verbal description of the algo would be helpful

Happy Holidays

Andy

On Fri, Dec 24, 2021 at 3:17 AM sam smith 
wrote:

> Hi Gourav,
>
> Good question! that's the programming language i am most proficient at.
> You are always welcome to suggest corrective remarks about my (Spark) code.
>
> Kind regards.
>
> Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta 
> a écrit :
>
>> Hi,
>>
>> out of sheer and utter curiosity, why JAVA?
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Dec 23, 2021 at 5:10 PM sam smith 
>> wrote:
>>
>>> Hi Andrew,
>>>
>>> Thanks, here's the Github repo to the code and the publication :
>>> https://github.com/SamSmithDevs10/paperReplicationForReview
>>>
>>> Kind regards
>>>
>>> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a
>>> écrit :
>>>
>>>> Hi Sam
>>>>
>>>>
>>>>
>>>> Can you tell us more? What is the algorithm? Can you send us the URL
>>>> the publication
>>>>
>>>>
>>>>
>>>> Kind regards
>>>>
>>>>
>>>>
>>>> Andy
>>>>
>>>>
>>>>
>>>> *From: *sam smith 
>>>> *Date: *Wednesday, December 22, 2021 at 10:59 AM
>>>> *To: *"user@spark.apache.org" 
>>>> *Subject: *About some Spark technical help
>>>>
>>>>
>>>>
>>>> Hello guys,
>>>>
>>>>
>>>>
>>>> I am replicating a paper's algorithm in Spark / Java, and want to ask
>>>> you guys for some assistance to validate / review about 150 lines of code.
>>>> My github repo contains both my java class and the related paper,
>>>>
>>>>
>>>>
>>>> Any interested reviewer here ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks.
>>>>
>>>


OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Andrew Davidson
Hi Sean and Gourav

Thanks for the suggestions. I thought that both the sql and dataframe apis are 
wrappers around the same frame work? Ie. catalysts.

I tend to mix and match my code. Sometimes I find it easier to write using sql 
some times dataframes. What is considered best practices?

Here is an example

Case 1
   for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000 times!
sampleName = self.sampleNamesList[i]
sampleDF= quantSparkDFList[i]
   sampleSDF.createOrReplaceTempView( "sample" )

# the sample name must be quoted else column names with a '-'
# like GTEX-1117F-0426-SM-5EGHI will generate an error
# spark thinks the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
   rawCounts as rc, \n\
   sample  \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )

rawCountsSDF = self.spark.sql( sqlStmt )
rawCountsSDF.createOrReplaceTempView( "rawCounts"

case 2
   for i in range( 1, len(dfList) ):
 df2 = dfList[i]
 retDF = retDF.join( df2.selectExpr("*"), on=["Name"] )


I think my out of memory exception maybe because the query plan is huge. I have 
not figure out how to figure out if that is my bug or not. My untested work 
around is organize the data so that each massive join is run on 1/5 of the 
total data set, then union them all together. Each “part” will still need to 
iterate 16000 times

In general I assume we want to avoid for loops. I assume Spark is unable to 
optimize them. It would be nice if spark provide some sort of join all function 
even if it used a for loop to hide this from me

Happy holidays

Andy



From: Sean Owen 
Date: Friday, December 24, 2021 at 8:30 AM
To: Gourav Sengupta 
Cc: Andrew Davidson , Nicholas Gustafson 
, User 
Subject: Re: AnalysisException: Trouble using select() to append multiple 
columns

(that's not the situation below we are commenting on)
On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>> wrote:
Hi,

try to write several withColumns in a dataframe with functions and then see the 
UI for time differences. This should be done with large data sets of course, in 
order of a around 200GB +

With scenarios involving nested queries and joins the time differences shown in 
the UI becomes a bit more visible.

Regards,
Gourav Sengupta

On Fri, Dec 24, 2021 at 2:48 PM Sean Owen 
mailto:sro...@gmail.com>> wrote:
Nah, it's going to translate to the same plan as the equivalent SQL.

On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>> wrote:
Hi,

please note that using SQL is much more performant, and easier to manage these 
kind of issues. You might want to look at the SPARK UI to see the advantage of 
using SQL over dataframes API.


Regards,
Gourav Sengupta

On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson  
wrote:
Thanks Nicholas

Andy

From: Nicholas Gustafson mailto:njgustaf...@gmail.com>>
Date: Friday, December 17, 2021 at 6:12 PM
To: Andrew Davidson 
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: AnalysisException: Trouble using select() to append multiple 
columns

Since df1 and df2 are different DataFrames, you will need to use a join. For 
example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”])

On Dec 17, 2021, at 16:25, Andrew Davidson  wrote:

Hi I am a newbie

I have 16,000 data files, all files have the same number of rows and columns. 
The row ids are identical and are in the same order. I want to create a new 
data frame that contains the 3rd column from each data file

I wrote a test program that uses a for loop and Join. It works with my small 
test set. I get an OOM when I try to run using the all the data files. I 
realize that join ( map reduce) is probably not a great solution for my problem

Recently I found several articles that take about the challenge with using 
withColumn() and talk about how to use select() to append columns

https://mungingdata.com/pyspark/select-add-columns-withcolumn/
https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop


I am using pyspark spark-3.1.2-bin-hadoop3.2

I wrote a little test program. It am able to append columns created using 
pyspark.sql.function.lit(). I am not able to append columns from other data 
frames


df1

DataFrame[Name: string, ctrl_1: double]

+---+--+

|   Name|ctrl_1|

+---+--+

| txId_1|   0.0|

| txId_2|  11.0|

| txId_3|  1

Re: About some Spark technical help

2021-12-23 Thread Andrew Davidson
Hi Sam

Can you tell us more? What is the algorithm? Can you send us the URL the 
publication

Kind regards

Andy

From: sam smith 
Date: Wednesday, December 22, 2021 at 10:59 AM
To: "user@spark.apache.org" 
Subject: About some Spark technical help

Hello guys,

I am replicating a paper's algorithm in Spark / Java, and want to ask you guys 
for some assistance to validate / review about 150 lines of code. My github 
repo contains both my java class and the related paper,

Any interested reviewer here ?


Thanks.


Re: ??? INFO CreateViewCommand:57 - Try to uncache `rawCounts` before replacing.

2021-12-21 Thread Andrew Davidson
Hi Jun

Thank you for your reply. My question is what is best practices? My for
loop run over 16000 joins. I get an out of memory exception.

What is the indented use of createOrReplaceTempView if I need to manage the
cache or create a uniq name each time



Kind regards

Andy

On Tue, Dec 21, 2021 at 6:12 AM Jun Zhu  wrote:

> Hi
>
> As far as I know. The warning should be caused by create same temp view
> names.rawCountsSDF.createOrReplaceTempView( "rawCounts" )
> You create a view "rawCounts", then in for loop, second round, you create
> a new view with name "rawCounts", spark3 would uncache the
> previous "rawCounts".
>
> Correct me if I'm wrong.
>
> Regards
>
>
> On Tue, Dec 21, 2021 at 10:05 PM Andrew Davidson 
> wrote:
>
>> Happy Holidays
>>
>>
>>
>> I am a newbie
>>
>>
>>
>> I have 16,000 data files, all files have the same number of rows and
>> columns. The row ids are identical and are in the same order. I want to
>> create a new data frame that contains the 3rd column from each data file.
>> My pyspark script runs correctly when I test on small number of files how
>> ever I get an OOM when I run on all 16000.
>>
>>
>>
>> To try and debug I ran a small test and set warning level to INFO. I
>> found the following
>>
>>
>>
>> 2021-12-21 00:47:04 INFO  CreateViewCommand:57 - Try to uncache
>> `rawCounts` before replacing.
>>
>>
>>
>> for i in range( 1, len( self.sampleNamesList ) ):
>>
>> sampleName = self.sampleNamesList[i]
>>
>>
>>
>> # select the key and counts from the sample.
>>
>> qsdf = quantSparkDFList[i]
>>
>> sampleSDF = qsdf\
>>
>> .select( ["Name", "NumReads", ] )\
>>
>> .withColumnRenamed( "NumReads", sampleName )
>>
>>
>>
>> sampleSDF.createOrReplaceTempView( "sample" )
>>
>>
>>
>> # the sample name must be quoted else column names with a '-'
>>
>> # like GTEX-1117F-0426-SM-5EGHI will generate an error
>>
>> # spark think the '-' is an expression. '_' is also
>>
>> # a special char for the sql like operator
>>
>> # https://stackoverflow.com/a/63899306/4586180
>>
>> sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
>>
>> from \n\
>>
>>rawCounts as rc, \n\
>>
>>sample  \n\
>>
>> where \n\
>>
>> rc.Name == sample.Name \n'.format(
>> sampleName )
>>
>>
>>
>> rawCountsSDF = self.spark.sql( sqlStmt )
>>
>> rawCountsSDF.createOrReplaceTempView( "rawCounts" )
>>
>>
>>
>>
>>
>> The way I wrote my script, I do a lot of transformations, the first
>> action is at the end of the script
>>
>> retCountDF.coalesce(1).write.csv( outfileCount, mode='overwrite',
>> header=True)
>>
>>
>>
>> Should I be calling sql.spark.sql( ‘uncache table rawCountsSDF “) before
>> calling   rawCountsSDF.createOrReplaceTempView( "rawCounts" ) ? I
>> expected to manage spark to manage the cache automatically given that I do
>> not explicitly call cache().
>>
>>
>>
>>
>>
>> How come I do not get a similar warning from?
>>
>> sampleSDF.createOrReplaceTempView( "sample" )
>>
>>
>>
>> Will this reduce my memory requirements?
>>
>>
>>
>> Kind regards
>>
>>
>>
>> Andy
>>
>
>
> --
> [image: vshapesaqua11553186012.gif] <https://vungle.com/>   *Jun Zhu*
> Sr. Engineer I, Data
> +86 18565739171
>
> [image: in1552694272.png] <https://www.linkedin.com/company/vungle>[image:
> fb1552694203.png] <https://facebook.com/vungle>  [image:
> tw1552694330.png] <https://twitter.com/vungle>  [image:
> ig1552694392.png] <https://www.instagram.com/vungle>
> Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
>
>


??? INFO CreateViewCommand:57 - Try to uncache `rawCounts` before replacing.

2021-12-20 Thread Andrew Davidson
Happy Holidays

I am a newbie

I have 16,000 data files, all files have the same number of rows and columns. 
The row ids are identical and are in the same order. I want to create a new 
data frame that contains the 3rd column from each data file. My pyspark script 
runs correctly when I test on small number of files how ever I get an OOM when 
I run on all 16000.

To try and debug I ran a small test and set warning level to INFO. I found the 
following

2021-12-21 00:47:04 INFO  CreateViewCommand:57 - Try to uncache `rawCounts` 
before replacing.

for i in range( 1, len( self.sampleNamesList ) ):
sampleName = self.sampleNamesList[i]

# select the key and counts from the sample.
qsdf = quantSparkDFList[i]
sampleSDF = qsdf\
.select( ["Name", "NumReads", ] )\
.withColumnRenamed( "NumReads", sampleName )

sampleSDF.createOrReplaceTempView( "sample" )

# the sample name must be quoted else column names with a '-'
# like GTEX-1117F-0426-SM-5EGHI will generate an error
# spark think the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
   rawCounts as rc, \n\
   sample  \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )

rawCountsSDF = self.spark.sql( sqlStmt )
rawCountsSDF.createOrReplaceTempView( "rawCounts" )


The way I wrote my script, I do a lot of transformations, the first action is 
at the end of the script
retCountDF.coalesce(1).write.csv( outfileCount, mode='overwrite', 
header=True)

Should I be calling sql.spark.sql( ‘uncache table rawCountsSDF “) before 
calling   rawCountsSDF.createOrReplaceTempView( "rawCounts" ) ? I expected to 
manage spark to manage the cache automatically given that I do not explicitly 
call cache().


How come I do not get a similar warning from?
sampleSDF.createOrReplaceTempView( "sample" )

Will this reduce my memory requirements?


Kind regards

Andy


Re: AnalysisException: Trouble using select() to append multiple columns

2021-12-18 Thread Andrew Davidson
Thanks Nicholas

Andy

From: Nicholas Gustafson 
Date: Friday, December 17, 2021 at 6:12 PM
To: Andrew Davidson 
Cc: "user@spark.apache.org" 
Subject: Re: AnalysisException: Trouble using select() to append multiple 
columns

Since df1 and df2 are different DataFrames, you will need to use a join. For 
example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”])


On Dec 17, 2021, at 16:25, Andrew Davidson  wrote:

Hi I am a newbie

I have 16,000 data files, all files have the same number of rows and columns. 
The row ids are identical and are in the same order. I want to create a new 
data frame that contains the 3rd column from each data file

I wrote a test program that uses a for loop and Join. It works with my small 
test set. I get an OOM when I try to run using the all the data files. I 
realize that join ( map reduce) is probably not a great solution for my problem

Recently I found several articles that take about the challenge with using 
withColumn() and talk about how to use select() to append columns

https://mungingdata.com/pyspark/select-add-columns-withcolumn/
https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop


I am using pyspark spark-3.1.2-bin-hadoop3.2

I wrote a little test program. It am able to append columns created using 
pyspark.sql.function.lit(). I am not able to append columns from other data 
frames


df1

DataFrame[Name: string, ctrl_1: double]

+---+--+

|   Name|ctrl_1|

+---+--+

| txId_1|   0.0|

| txId_2|  11.0|

| txId_3|  12.0|

| txId_4|  13.0|

| txId_5|  14.0|

| txId_6|  15.0|

| txId_7|  16.0|

| txId_8|  17.0|

| txId_9|  18.0|

|txId_10|  19.0|

+---+--+

# use select to append multiple literals
allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), 
pyf.lit("mn0").alias("y")] )

allDF3
DataFrame[Name: string, ctrl_1: double, x: string, y: string]
+---+--+---+---+
|   Name|ctrl_1|  x|  y|
+---+--+---+---+
| txId_1|   0.0|abc|mn0|
| txId_2|  11.0|abc|mn0|
| txId_3|  12.0|abc|mn0|
| txId_4|  13.0|abc|mn0|
| txId_5|  14.0|abc|mn0|
| txId_6|  15.0|abc|mn0|
| txId_7|  16.0|abc|mn0|
| txId_8|  17.0|abc|mn0|
| txId_9|  18.0|abc|mn0|
|txId_10|  19.0|abc|mn0|
+---+--+---+---+


df2

DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
NumReads: double]

+---+--+---+++

|   Name|Length|EffectiveLength| TPM|NumReads|

+---+--+---+++

| txId_1|  1500| 1234.5|12.1| 0.1|

| txId_2|  1510| 1244.5|13.1|11.1|

| txId_3|  1520| 1254.5|14.1|12.1|

| txId_4|  1530| 1264.5|15.1|13.1|

| txId_5|  1540| 1274.5|16.1|14.1|

| txId_6|  1550| 1284.5|17.1|15.1|

| txId_7|  1560| 1294.5|18.1|16.1|

| txId_8|  1570| 1304.5|19.1|17.1|

| txId_9|  1580| 1314.5|20.1|18.1|

|txId_10|  1590| 1324.5|21.1|19.1|

+---+--+---+++



s2Col = df2["NumReads"].alias( 'ctrl_2' )

print("type(s2Col) = {}".format(type(s2Col)) )



type(s2Col) = 

allDF4 = df1.select( ["*", s2Col] )

~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
 in select(self, *cols)

   1667 [Row(name='Alice', age=12), Row(name='Bob', age=15)]

   1668 """

-> 1669 jdf = self._jdf.select(self._jcols(*cols))

   1670 return DataFrame(jdf, self.sql_ctx)

   1671



../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)

   1303 answer = self.gateway_client.send_command(command)

   1304 return_value = get_return_value(

-> 1305 answer, self.gateway_client, self.target_id, self.name)

   1306

   1307 for temp_arg in temp_args:



~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
 in deco(*a, **kw)

115 # Hide where the exception came from that shows a 
non-Pythonic

116 # JVM exception message.

--> 117 raise converted from None

118 else:

119 raise



AnalysisException: Resolved attribute(s) NumReads#14 missing from 
Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS 
ctrl_2#2550].;

!Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550]

+- Project [Name#0, NumReads#4 AS ctrl_1#2447]

   +- Project [Name#0, NumReads#4]

  +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv

Any idea what my bug is?

Kind regards

Andy


AnalysisException: Trouble using select() to append multiple columns

2021-12-17 Thread Andrew Davidson
Hi I am a newbie

I have 16,000 data files, all files have the same number of rows and columns. 
The row ids are identical and are in the same order. I want to create a new 
data frame that contains the 3rd column from each data file

I wrote a test program that uses a for loop and Join. It works with my small 
test set. I get an OOM when I try to run using the all the data files. I 
realize that join ( map reduce) is probably not a great solution for my problem

Recently I found several articles that take about the challenge with using 
withColumn() and talk about how to use select() to append columns

https://mungingdata.com/pyspark/select-add-columns-withcolumn/
https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop


I am using pyspark spark-3.1.2-bin-hadoop3.2

I wrote a little test program. It am able to append columns created using 
pyspark.sql.function.lit(). I am not able to append columns from other data 
frames


df1

DataFrame[Name: string, ctrl_1: double]

+---+--+

|   Name|ctrl_1|

+---+--+

| txId_1|   0.0|

| txId_2|  11.0|

| txId_3|  12.0|

| txId_4|  13.0|

| txId_5|  14.0|

| txId_6|  15.0|

| txId_7|  16.0|

| txId_8|  17.0|

| txId_9|  18.0|

|txId_10|  19.0|

+---+--+

# use select to append multiple literals
allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), 
pyf.lit("mn0").alias("y")] )

allDF3
DataFrame[Name: string, ctrl_1: double, x: string, y: string]
+---+--+---+---+
|   Name|ctrl_1|  x|  y|
+---+--+---+---+
| txId_1|   0.0|abc|mn0|
| txId_2|  11.0|abc|mn0|
| txId_3|  12.0|abc|mn0|
| txId_4|  13.0|abc|mn0|
| txId_5|  14.0|abc|mn0|
| txId_6|  15.0|abc|mn0|
| txId_7|  16.0|abc|mn0|
| txId_8|  17.0|abc|mn0|
| txId_9|  18.0|abc|mn0|
|txId_10|  19.0|abc|mn0|
+---+--+---+---+


df2

DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
NumReads: double]

+---+--+---+++

|   Name|Length|EffectiveLength| TPM|NumReads|

+---+--+---+++

| txId_1|  1500| 1234.5|12.1| 0.1|

| txId_2|  1510| 1244.5|13.1|11.1|

| txId_3|  1520| 1254.5|14.1|12.1|

| txId_4|  1530| 1264.5|15.1|13.1|

| txId_5|  1540| 1274.5|16.1|14.1|

| txId_6|  1550| 1284.5|17.1|15.1|

| txId_7|  1560| 1294.5|18.1|16.1|

| txId_8|  1570| 1304.5|19.1|17.1|

| txId_9|  1580| 1314.5|20.1|18.1|

|txId_10|  1590| 1324.5|21.1|19.1|

+---+--+---+++



s2Col = df2["NumReads"].alias( 'ctrl_2' )

print("type(s2Col) = {}".format(type(s2Col)) )



type(s2Col) = 

allDF4 = df1.select( ["*", s2Col] )

~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
 in select(self, *cols)

   1667 [Row(name='Alice', age=12), Row(name='Bob', age=15)]

   1668 """

-> 1669 jdf = self._jdf.select(self._jcols(*cols))

   1670 return DataFrame(jdf, self.sql_ctx)

   1671



../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)

   1303 answer = self.gateway_client.send_command(command)

   1304 return_value = get_return_value(

-> 1305 answer, self.gateway_client, self.target_id, self.name)

   1306

   1307 for temp_arg in temp_args:



~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
 in deco(*a, **kw)

115 # Hide where the exception came from that shows a 
non-Pythonic

116 # JVM exception message.

--> 117 raise converted from None

118 else:

119 raise



AnalysisException: Resolved attribute(s) NumReads#14 missing from 
Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS 
ctrl_2#2550].;

!Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550]

+- Project [Name#0, NumReads#4 AS ctrl_1#2447]

   +- Project [Name#0, NumReads#4]

  +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv

Any idea what my bug is?

Kind regards

Andy