igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708508134
@HyukjinKwon This is not matching what I am seeing with is test case. The
batch size setting is definitely changing what is available within the
`mapInArrow()` call after the repartition by the grouping column.
```python
data = []
for c in range(1,20001):
for aid in range(1,11):
data.append([c, aid])
df = spark.createDataFrame(data, schema=["customer_id", "idx"])
def udf(b):
for i in b:
print(f"Num of rows per partition: {i.num_rows}")
yield i
print("Setting batch size to 10K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
df.repartition("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 20K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
df.repartition("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 30K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
df.repartition("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
def udf2(df):
print(f"Num of rows per group: {len(df)}")
return df
print("\n")
print("Using applyInPandas")
df.groupby("idx").applyInPandas(udf2,
schema=df.schema).write.format("noop").mode("overwrite").save()
```
The output:
```
Setting batch size to 10K
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Setting batch size to 20K
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Setting batch size to 30K
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 20000
Using applyInPandas
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]