Well I tried using windowing functions with pivot() and it did not work. >From your reply, you are looking for a function that would ideally combine the conciseness of pivot() with the flexibility of explicit aggregations. While Spark provides powerful tools, there is not a single built-in function that perfectly encapsulates this.
The existing approach that I mentioned (i.e. grouping with conditional aggregations) is the recommended way to achieve this because it prioritizes things like clarity and control. >From my experience - Readability Matters: Explicit code is often better than overly complex or obscure code. As the famous saying goes, there are two ways of constructing a software design: One way is to make it so simple that there are obviously no deficiencies and the other way is to make it so complicated that there are no obvious deficiencies. - You need control over how each column is aggregated. - Spark's query optimizer is generally good at handling aggregations, even with when() conditions. Therefore, while the verbosity of the current solution might seem less than ideal, it is the most practical and reliable way to achieve your specific requirements in Spark. HTH <https://medium.com/@manutej/mastering-sql-window-functions-guide-e6dc17eb1995#:~:text=Window%20functions%20can%20perform%20a,related%20to%20the%20current%20row.> Dr Mich Talebzadeh, Architect | Data Science | Financial Crime | Forensic Analysis | GDPR view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> On Sun, 9 Mar 2025 at 18:25, Dhruv Singla <dvsingla...@gmail.com> wrote: > Hey, I already know this and have written the same in my question. I know > formatting can make the code a lot simpler and easier to understand, but > I'm looking if there is already a function or a spark built-in for this. > Thanks for the help though. > > On Sun, Mar 9, 2025 at 11:42 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> import pyspark >> from pyspark import SparkConf, SparkContext >> from pyspark.sql import SparkSession >> from pyspark.sql import SQLContext >> from pyspark.sql.functions import struct >> from pyspark.sql import functions as F >> from pyspark.sql.types import StructType, StructField, IntegerType, >> StringType, DateType >> >> spark = SparkSession.builder.appName("testme").getOrCreate() >> sc = spark.sparkContext >> # Set the log level to ERROR to reduce verbosity >> sc.setLogLevel("ERROR") >> >> # Define the schema >> schema = StructType([ >> StructField("code", IntegerType(), True), >> StructField("doc_type", StringType(), True), >> StructField("amount", IntegerType(), True), >> StructField("load_date", StringType(), True) >> ]) >> >> # Create the DataFrame >> data = [ >> [1, 'AB', 12, '2022-01-01'], >> [1, 'AA', 22, '2022-01-10'], >> [1, 'AC', 11, '2022-01-11'], >> [2, 'AB', 22, '2022-02-01'], >> [2, 'AA', 28, '2022-02-10'], >> [2, 'AC', 25, '2022-02-22'] >> ] >> >> df = spark.createDataFrame(data, schema=schema) >> >> df = df.withColumn('load_date', F.to_date('load_date')) >> >> grouped_df = df.groupBy('code') >> >> pivot_aggs = [ >> F.sum(F.when(F.col('doc_type') == doc_type, >> F.col('amount'))).alias(f'{doc_type}_amnt') >> for doc_type in ['AB', 'AA', 'AC'] # Dynamically define pivot columns >> ] >> >> non_pivot_aggs = [ >> F.first('load_date').alias('load_date') # Or any other aggregation >> like min, max... >> ] >> >> all_aggs = pivot_aggs + non_pivot_aggs >> >> df = grouped_df.agg(*all_aggs) >> >> df.printSchema() >> df.show(20, False) >> >> Output >> >> root >> |-- code: integer (nullable = true) >> |-- AB_amnt: long (nullable = true) >> |-- AA_amnt: long (nullable = true) >> |-- AC_amnt: long (nullable = true) >> |-- load_date: date (nullable = true) >> >> +----+-------+-------+-------+----------+ >> |code|AB_amnt|AA_amnt|AC_amnt|load_date | >> +----+-------+-------+-------+----------+ >> |1 |12 |22 |11 |2022-01-01| >> |2 |22 |28 |25 |2022-02-01| >> +----+-------+-------+-------+----------+ >> >> HTH >> >> Dr Mich Talebzadeh, >> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> >> >> On Sun, 9 Mar 2025 at 17:23, Dhruv Singla <dvsingla...@gmail.com> wrote: >> >>> Yes, this is it. I want to form this using a simple short command. The >>> way I mentioned is a lengthy one. >>> >>> On Sun, Mar 9, 2025 at 10:16 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Is this what you are expecting? >>>> >>>> root >>>> |-- code: integer (nullable = true) >>>> |-- AB_amnt: long (nullable = true) >>>> |-- AA_amnt: long (nullable = true) >>>> |-- AC_amnt: long (nullable = true) >>>> |-- load_date: date (nullable = true) >>>> >>>> +----+-------+-------+-------+----------+ >>>> |code|AB_amnt|AA_amnt|AC_amnt|load_date | >>>> +----+-------+-------+-------+----------+ >>>> |1 |12 |22 |11 |2022-01-01| >>>> |2 |22 |28 |25 |2022-02-01| >>>> +----+-------+-------+-------+----------+ >>>> >>>> Dr Mich Talebzadeh, >>>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> >>>> >>>> On Sun, 9 Mar 2025 at 14:12, Dhruv Singla <dvsingla...@gmail.com> >>>> wrote: >>>> >>>>> Hi Everyone >>>>> >>>>> Hope you are doing well >>>>> >>>>> I have the following dataframe. >>>>> >>>>> df = spark.createDataFrame( >>>>> [ >>>>> [1, 'AB', 12, '2022-01-01'] >>>>> , [1, 'AA', 22, '2022-01-10'] >>>>> , [1, 'AC', 11, '2022-01-11'] >>>>> , [2, 'AB', 22, '2022-02-01'] >>>>> , [2, 'AA', 28, '2022-02-10'] >>>>> , [2, 'AC', 25, '2022-02-22'] >>>>> ] >>>>> , 'code: int, doc_type: string, amount: int, load_date: string' >>>>> ) >>>>> df = df.withColumn('load_date', F.to_date('load_date')) >>>>> >>>>> I want to pivot the amount but just want the first value from the >>>>> date. This is what I tried and it is not giving me the desried results. >>>>> >>>>> ( >>>>> df.groupBy('code') >>>>> .pivot('doc_type', ['AB', 'AA', 'AC']) >>>>> .agg(F.sum('amount').alias('amnt'), >>>>> F.first('load_date').alias('ldt')) >>>>> .show() >>>>> ) >>>>> >>>>> +----+-------+----------+-------+----------+-------+----------+ >>>>> |code|AB_amnt| AB_ldt|AA_amnt| AA_ldt|AC_amnt| AC_ldt| >>>>> +----+-------+----------+-------+----------+-------+----------+ >>>>> | 1| 12|2022-01-01| 22|2022-01-10| 11|2022-01-11| >>>>> | 2| 22|2022-02-01| 28|2022-02-10| 25|2022-02-22| >>>>> +----+-------+----------+-------+----------+-------+----------+ >>>>> >>>>> This is what I want. >>>>> >>>>> ( >>>>> df.groupBy('code') >>>>> .agg( >>>>> F.sum(F.when(F.col('doc_type') == 'AB', >>>>> F.col('amount'))).alias('AB_amnt') >>>>> , F.sum(F.when(F.col('doc_type') == 'AA', >>>>> F.col('amount'))).alias('AA_amnt') >>>>> , F.sum(F.when(F.col('doc_type') == 'AC', >>>>> F.col('amount'))).alias('AC_amnt') >>>>> , F.first('load_date').alias('load_date') >>>>> ) >>>>> .show() >>>>> ) >>>>> >>>>> +----+-------+-------+-------+----------+ >>>>> |code|AB_amnt|AA_amnt|AC_amnt| load_date| >>>>> +----+-------+-------+-------+----------+ >>>>> | 1| 12| 22| 11|2022-01-01| >>>>> | 2| 22| 28| 25|2022-02-01| >>>>> +----+-------+-------+-------+----------+ >>>>> >>>>> Is there any simpler way to do it? I have more than one column to put >>>>> into pivot and also to put into non pivot. >>>>> >>>>> I am using Databricks 14.3 LTS with Spark 3.5.0 >>>>> >>>>> Thanks & Regards >>>>> Dhruv >>>>> >>>>