Hi guys, I'm trying to calculate WoE on a particular categorical column depending on the target column. But the code is taking a lot of time on very few datapoints (rows).
How can I optimize it to make it performant enough? Here's the code (here categorical_col is a python list of columns) - for item in categorical_col: new_df = spark.sql('Select `' + item + '`, `' + target_col + '`, count(*) as Counts from a group by `' + item + '`, `' + target_col + '` order by `' + item + '`') # new_df.show() new_df.registerTempTable('b') # exit(0) new_df2 = spark.sql('Select `' + item + '`, ' + 'case when `' + target_col + '` == 0 then Counts else 0 end as Count_0, ' + 'case when `' + target_col + '` == 1 then Counts else 0 end as Count_1 ' + 'from b') spark.catalog.dropTempView('b') # new_df2.show() new_df2.registerTempTable('c') # exit(0) new_df3 = spark.sql('SELECT `' + item + '`, SUM(Count_0) AS Count_0, ' + 'SUM(Count_1) AS Count_1 FROM c GROUP BY `' + item + '`') spark.catalog.dropTempView('c') # new_df3.show() # exit(0) new_df3.registerTempTable('d') # SQL DF Experiment new_df4 = spark.sql('Select `' + item + '` as bucketed_col_of_source, Count_0/(select sum(d.Count_0) as sum from d) as Prop_0, ' + 'Count_1/(select sum(d.Count_1) as sum from d) as Prop_1 from d') spark.catalog.dropTempView('d') # new_df4.show() # exit(0) new_df4.registerTempTable('e') new_df5 = spark.sql('Select *, case when log(e.Prop_0/e.Prop_1) IS NULL then 0 else log(e.Prop_0/e.Prop_1) end as WoE from e') spark.catalog.dropTempView('e') # print('Problem starts here: ') # new_df5.show() new_df5.registerTempTable('WoE_table') joined_Train_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item + '_WoE` from a bucketed inner join WoE_table on bucketed.`' + item + '` = WoE_table.bucketed_col_of_source') # joined_Train_DF.show() joined_Test_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item + '_WoE` from test_data bucketed inner join WoE_table on bucketed.`' + item + '` = WoE_table.bucketed_col_of_source') if validation: joined_Validation_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item + '_WoE` from validation_data bucketed inner join WoE_table on bucketed.`' + item + '` = WoE_table.bucketed_col_of_source') WoE_Validation_DF = joined_Validation_DF spark.catalog.dropTempView('WoE_table') WoE_Train_DF = joined_Train_DF WoE_Test_DF = joined_Test_DF col_len = len(categorical_col) if col_len > 1: WoE_Train_DF.registerTempTable('a') WoE_Test_DF.registerTempTable('test_data') if validation: # print('got inside...') WoE_Validation_DF.registerTempTable('validation_data') Any help? Thanks, Aakash.