pyspark loop optimization
I want to compute cume_dist on a bunch of columns in a spark dataframe, but want to remove NULL values before doing so. I have this loop in pyspark. While this works, I see the driver runs at 100% while the executors are idle for the most part. I am reading that running a loop is an anti-pattern and should be avoided. Any pointers on how to optimize this section of pyspark code? I am running this on the AWS Glue 3.0 environment. for column_name, new_col in [ ("event_duration", "percentile_rank_evt_duration"), ("event_duration_pred", "percentile_pred_evt_duration"), ("alarm_cnt", "percentile_rank_alarm_cnt"), ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), ("event_duration_adj", "percentile_rank_evt_duration_adj"), ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), ("encounter_time", "percentile_rank_encounter_time"), ("encounter_time_pred", "percentile_pred_encounter_time"), ("encounter_time_adj", "percentile_rank_encounter_time_adj"), ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), ]: win = ( Window().partitionBy(["p_customer_name", "p_site_name", "year_month"]) .orderBy(col(column_name)) ) df1 = df.filter(F.col(column_name).isNull()) df2 = df.filter(F.col(column_name).isNotNull()).withColumn( new_col, F.round(F.cume_dist().over(win) * lit(100)).cast("integer") ) df = df2.unionByName(df1, allowMissingColumns=True) For some reason this code seems to work faster, but it doesn't remove NULLs prior to computing the cume_dist. Not sure if this is also a proper way to do this :( for column_name, new_col in [ ("event_duration", "percentile_rank_evt_duration"), ("event_duration_pred", "percentile_pred_evt_duration"), ("alarm_cnt", "percentile_rank_alarm_cnt"), ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), ("event_duration_adj", "percentile_rank_evt_duration_adj"), ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), ("encounter_time", "percentile_rank_encounter_time"), ("encounter_time_pred", "percentile_pred_encounter_time"), ("encounter_time_adj", "percentile_rank_encounter_time_adj"), ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), ]: win = ( Window().partitionBy(["p_customer_name", "p_site_name", "year_month"]) .orderBy(col(column_name)) ) df = df.withColumn( new_col, F.when(F.col(column_name).isNull(), F.lit(None)).otherwise( F.round(F.percent_rank().over(win) * lit(100)).cast("integer") ), ) Appreciate if anyone has any pointers on how to go about this.. thanks Ramesh
Re: How to add a row number column with out reordering my data frame
Hi, I am a bit confused here, it is not entirely clear to me why are you creating the row numbers, and how creating the row numbers helps you with the joins? Can you please explain with some sample data? Regards, Gourav On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson wrote: > Hi > > > > I am trying to work through a OOM error. I have 10411 files. I want to > select a single column from each file and then join them into a single > table. > > > > The files have a row unique id. However it is a very long string. The data > file with just the name and column of interest is about 470 M. The column > of interest alone is 21 m. it is a column over 5 million real numbers. > > > > So I thought I would save a lot of memory if I can join over row numbers. > > > > # create *dummy* variable to *orderby* > https://www.py4u.net/discuss/1840945 > > w = Window().orderBy(lit('A')) > > sampleDF = sampleDF.select( ["NumReads"] )\ > > .withColumnRenamed( "NumReads", sampleName )\ > > .withColumn( "*tid*",row_number().over(w) ) > > > > > > This code seem pretty complicated as someone coming from pandas an R > dataframes. My unit test works however it generates the following warning. > > > > > > WARN WindowExec: No Partition Defined for Window operation! Moving all > data to a single partition, this can cause serious performance degradation. > > > > > > Is there a better way to create a row number with our reordering my data? > The order is important > > > > Kind regards > > > > Andy >
Re: hive table with large column data size
Hi, As always, before answering the question, can I please ask what are you trying to achieve by storing the data in a table? How are you planning to query a binary data? If you look at any relational theory, then it states that a table is a relation/ entity and the fields the attributes. You might consider an image to be an attribute of a tuple (or record) belonging to a particular relation, but there might be more efficient methods of storing the binary data, but it all depends on what are you trying to do? For the data types please look here: https://spark.apache.org/docs/latest/sql-ref-datatypes.html. Parquet is definitely a columnar format, and if I am not entirely wrong, it definitely supports columnar reading of data by default in SPARK. Regards, Gourav Sengupta On Sun, Jan 9, 2022 at 2:34 PM weoccc wrote: > Hi , > > I want to store binary data (such as images) into hive table but the > binary data column might be much larger than other columns per row. I'm > worried about the query performance. One way I can think of is to separate > binary data storage from other columns by creating 2 hive tables and run 2 > separate spark query and join them later. > > Later, I found parquet has supported column split into different files as > shown here: > https://parquet.apache.org/documentation/latest/ > > I'm wondering if spark sql already supports that ? If so, how to use ? > > Weide >
Difference in behavior for Spark 3.0 vs Spark 3.1 "create database "
Hi Spark Team When creating a database via Spark 3.0 on Hive 1) spark.sql("create database test location '/user/hive'"). It creates the database location on hdfs . As expected 2) When running the same command on 3.1 the database is created on the local file system by default. I have to prefix with hdfs to create db on hdfs. Why is there a difference in the behavior, Can you please point me to the jira which causes this change. Note : spark.sql.warehouse.dir and hive.metastore.warehouse.dir both are having default values(not explicitly set) Regards Pralabh Kumar