igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708893876

   Adding a `sortWithinPartitions("idx")` also does not result in a viable 
workaround given the results. That only "kinda" works when all 
partitions/groups have the same size, and we know that beforehand, so we can 
set the batch size to that value; obviously, this is not a reasonable workflow.
   
   ```python
   data = []
   for c in range(1,20001):
       for aid in range(1,11):
           data.append([c, aid])
   
   df = spark.createDataFrame(data, schema=["customer_id", "idx"])
   
   def udf(b):
       for i in b:
           print(f"Num of rows per partition: {i.num_rows}")
           print(f"Unique idx: {i.to_pandas().idx.unique()}")
           yield i
   print("Setting batch size to 10K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 20K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 30K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   ```
   
   Output:
   ```
   Setting batch size to 10K
   Num of rows per partition: 10000                                    (0 + 1) 
/ 1]
   Unique idx: [1]
   Num of rows per partition: 10000
   Unique idx: [1]
   Num of rows per partition: 10000
   Unique idx: [2]
   Num of rows per partition: 10000
   Unique idx: [2]
   Num of rows per partition: 10000
   Unique idx: [3]
   Num of rows per partition: 10000
   Unique idx: [3]
   Num of rows per partition: 10000
   Unique idx: [4]
   Num of rows per partition: 10000
   Unique idx: [4]
   Num of rows per partition: 10000
   Unique idx: [5]
   Num of rows per partition: 10000
   Unique idx: [5]
   Num of rows per partition: 10000
   Unique idx: [6]
   Num of rows per partition: 10000
   Unique idx: [6]
   Num of rows per partition: 10000
   Unique idx: [7]
   Num of rows per partition: 10000
   Unique idx: [7]
   Num of rows per partition: 10000
   Unique idx: [8]
   Num of rows per partition: 10000
   Unique idx: [8]
   Num of rows per partition: 10000
   Unique idx: [9]
   Num of rows per partition: 10000
   Unique idx: [9]
   Num of rows per partition: 10000
   Unique idx: [10]
   Num of rows per partition: 10000
   Unique idx: [10]
                                                                                
   
   
   Setting batch size to 20K
   Num of rows per partition: 20000
   Unique idx: [1]
   Num of rows per partition: 20000
   Unique idx: [2]
   Num of rows per partition: 20000
   Unique idx: [3]
   Num of rows per partition: 20000
   Unique idx: [4]
   Num of rows per partition: 20000
   Unique idx: [5]
   Num of rows per partition: 20000
   Unique idx: [6]
   Num of rows per partition: 20000
   Unique idx: [7]
   Num of rows per partition: 20000
   Unique idx: [8]
   Num of rows per partition: 20000
   Unique idx: [9]
   Num of rows per partition: 20000
   Unique idx: [10]
   
   
   Setting batch size to 30K
   Num of rows per partition: 30000
   Unique idx: [1 2]
   Num of rows per partition: 30000
   Unique idx: [2 3]
   Num of rows per partition: 30000
   Unique idx: [4 5]
   Num of rows per partition: 30000
   Unique idx: [5 6]
   Num of rows per partition: 30000
   Unique idx: [7 8]
   Num of rows per partition: 30000
   Unique idx: [8 9]
   Num of rows per partition: 20000
   Unique idx: [10]
   ```
   
   Adding randomness to the groups makes it clear
   
   ```python
   import random
   
   data = []
   for c in range(1,20001):
       for aid in range(1,11):
           if random.choice([True, False]):
               data.append([c, aid])
   
   df = spark.createDataFrame(data, schema=["customer_id", "idx"])
   
   def udf(b):
       for i in b:
           print(f"Num of rows per partition: {i.num_rows}")
           print(f"Unique idx: {i.to_pandas().idx.unique()}")
           yield i
   print("Setting batch size to 10K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 20K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 30K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   def udf2(df):
       print(f"Num of rows per group: {len(df)}")
       print(f"Unique idx: {df.idx.unique()}")
       return df
   
   print("\n")
   print("Using applyInPandas")
   df.groupby("idx").applyInPandas(udf2, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   ```
   
   Output:
   ```
   Setting batch size to 10K
   Num of rows per partition: 10000
   Unique idx: [1 2]
   Num of rows per partition: 10000
   Unique idx: [2 3]
   Num of rows per partition: 10000
   Unique idx: [3 4]
   Num of rows per partition: 10000
   Unique idx: [4 5]
   Num of rows per partition: 10000
   Unique idx: [5 6]
   Num of rows per partition: 10000
   Unique idx: [6 7]
   Num of rows per partition: 10000
   Unique idx: [7 8]
   Num of rows per partition: 10000
   Unique idx: [8 9]
   Num of rows per partition: 10000
   Unique idx: [ 9 10]
   Num of rows per partition: 9982
   Unique idx: [10]
   
   Setting batch size to 20K
   Num of rows per partition: 20000
   Unique idx: [1 2 3]
   Num of rows per partition: 20000
   Unique idx: [3 4 5]
   Num of rows per partition: 20000
   Unique idx: [5 6 7]
   Num of rows per partition: 20000
   Unique idx: [7 8 9]
   Num of rows per partition: 19982
   Unique idx: [ 9 10]
   
   Setting batch size to 30K
   Num of rows per partition: 30000
   Unique idx: [1 2 3 4]
   Num of rows per partition: 30000
   Unique idx: [4 5 6 7]
   Num of rows per partition: 30000
   Unique idx: [ 7  8  9 10]
   Num of rows per partition: 9982
   Unique idx: [10]
   
   Using applyInPandas
   Num of rows per group: 9972
   Unique idx: [1]
   Num of rows per group: 9978
   Unique idx: [2]
   Num of rows per group: 9953
   Unique idx: [3]
   Num of rows per group: 10004
   Unique idx: [4]
   Num of rows per group: 9956
   Unique idx: [5]
   Num of rows per group: 10036
   Unique idx: [6]
   Num of rows per group: 10037
   Unique idx: [7]
   Num of rows per group: 9898
   Unique idx: [8]
   Num of rows per group: 10011
   Unique idx: [9]
   Num of rows per group: 10137
   Unique idx: [10]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to