igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708508134

   @HyukjinKwon This is not matching what I am seeing with is test case. The 
batch size setting is definitely changing what is available within the 
`mapInArrow()` call after the repartition by the grouping column.
   
   ```python
   data = []
   for c in range(1,20001):
       for aid in range(1,11):
           data.append([c, aid])
   
   df = spark.createDataFrame(data, schema=["customer_id", "idx"])
   
   def udf(b):
       for i in b:
           print(f"Num of rows per partition: {i.num_rows}")
           yield i
   print("Setting batch size to 10K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
   df.repartition("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 20K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
   df.repartition("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 30K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   df.repartition("idx").mapInArrow(udf, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   def udf2(df):
       print(f"Num of rows per group: {len(df)}")
       return df
   
   print("\n")
   print("Using applyInPandas")
   df.groupby("idx").applyInPandas(udf2, 
schema=df.schema).write.format("noop").mode("overwrite").save()
   ```
   
   The output:
   ```
   Setting batch size to 10K
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   
   
   Setting batch size to 20K
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   
   
   Setting batch size to 30K
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 20000
   
   
   
   
   Using applyInPandas
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to