igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708893876
Adding a `sortWithinPartitions("idx")` also does not result in a viable
workaround given the results. That only "kinda" works when all
partitions/groups have the same size, and we know that beforehand, so we can
set the batch size to that value; obviously, this is not a reasonable workflow.
```python
data = []
for c in range(1,20001):
for aid in range(1,11):
data.append([c, aid])
df = spark.createDataFrame(data, schema=["customer_id", "idx"])
def udf(b):
for i in b:
print(f"Num of rows per partition: {i.num_rows}")
print(f"Unique idx: {i.to_pandas().idx.unique()}")
yield i
print("Setting batch size to 10K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 20K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 30K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
```
Output:
```
Setting batch size to 10K
Num of rows per partition: 10000 (0 + 1)
/ 1]
Unique idx: [1]
Num of rows per partition: 10000
Unique idx: [1]
Num of rows per partition: 10000
Unique idx: [2]
Num of rows per partition: 10000
Unique idx: [2]
Num of rows per partition: 10000
Unique idx: [3]
Num of rows per partition: 10000
Unique idx: [3]
Num of rows per partition: 10000
Unique idx: [4]
Num of rows per partition: 10000
Unique idx: [4]
Num of rows per partition: 10000
Unique idx: [5]
Num of rows per partition: 10000
Unique idx: [5]
Num of rows per partition: 10000
Unique idx: [6]
Num of rows per partition: 10000
Unique idx: [6]
Num of rows per partition: 10000
Unique idx: [7]
Num of rows per partition: 10000
Unique idx: [7]
Num of rows per partition: 10000
Unique idx: [8]
Num of rows per partition: 10000
Unique idx: [8]
Num of rows per partition: 10000
Unique idx: [9]
Num of rows per partition: 10000
Unique idx: [9]
Num of rows per partition: 10000
Unique idx: [10]
Num of rows per partition: 10000
Unique idx: [10]
Setting batch size to 20K
Num of rows per partition: 20000
Unique idx: [1]
Num of rows per partition: 20000
Unique idx: [2]
Num of rows per partition: 20000
Unique idx: [3]
Num of rows per partition: 20000
Unique idx: [4]
Num of rows per partition: 20000
Unique idx: [5]
Num of rows per partition: 20000
Unique idx: [6]
Num of rows per partition: 20000
Unique idx: [7]
Num of rows per partition: 20000
Unique idx: [8]
Num of rows per partition: 20000
Unique idx: [9]
Num of rows per partition: 20000
Unique idx: [10]
Setting batch size to 30K
Num of rows per partition: 30000
Unique idx: [1 2]
Num of rows per partition: 30000
Unique idx: [2 3]
Num of rows per partition: 30000
Unique idx: [4 5]
Num of rows per partition: 30000
Unique idx: [5 6]
Num of rows per partition: 30000
Unique idx: [7 8]
Num of rows per partition: 30000
Unique idx: [8 9]
Num of rows per partition: 20000
Unique idx: [10]
```
Adding randomness to the groups makes it clear
```python
import random
data = []
for c in range(1,20001):
for aid in range(1,11):
if random.choice([True, False]):
data.append([c, aid])
df = spark.createDataFrame(data, schema=["customer_id", "idx"])
def udf(b):
for i in b:
print(f"Num of rows per partition: {i.num_rows}")
print(f"Unique idx: {i.to_pandas().idx.unique()}")
yield i
print("Setting batch size to 10K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 20K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 30K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
df.repartition("idx").sortWithinPartitions("idx").mapInArrow(udf,
schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
def udf2(df):
print(f"Num of rows per group: {len(df)}")
print(f"Unique idx: {df.idx.unique()}")
return df
print("\n")
print("Using applyInPandas")
df.groupby("idx").applyInPandas(udf2,
schema=df.schema).write.format("noop").mode("overwrite").save()
```
Output:
```
Setting batch size to 10K
Num of rows per partition: 10000
Unique idx: [1 2]
Num of rows per partition: 10000
Unique idx: [2 3]
Num of rows per partition: 10000
Unique idx: [3 4]
Num of rows per partition: 10000
Unique idx: [4 5]
Num of rows per partition: 10000
Unique idx: [5 6]
Num of rows per partition: 10000
Unique idx: [6 7]
Num of rows per partition: 10000
Unique idx: [7 8]
Num of rows per partition: 10000
Unique idx: [8 9]
Num of rows per partition: 10000
Unique idx: [ 9 10]
Num of rows per partition: 9982
Unique idx: [10]
Setting batch size to 20K
Num of rows per partition: 20000
Unique idx: [1 2 3]
Num of rows per partition: 20000
Unique idx: [3 4 5]
Num of rows per partition: 20000
Unique idx: [5 6 7]
Num of rows per partition: 20000
Unique idx: [7 8 9]
Num of rows per partition: 19982
Unique idx: [ 9 10]
Setting batch size to 30K
Num of rows per partition: 30000
Unique idx: [1 2 3 4]
Num of rows per partition: 30000
Unique idx: [4 5 6 7]
Num of rows per partition: 30000
Unique idx: [ 7 8 9 10]
Num of rows per partition: 9982
Unique idx: [10]
Using applyInPandas
Num of rows per group: 9972
Unique idx: [1]
Num of rows per group: 9978
Unique idx: [2]
Num of rows per group: 9953
Unique idx: [3]
Num of rows per group: 10004
Unique idx: [4]
Num of rows per group: 9956
Unique idx: [5]
Num of rows per group: 10036
Unique idx: [6]
Num of rows per group: 10037
Unique idx: [7]
Num of rows per group: 9898
Unique idx: [8]
Num of rows per group: 10011
Unique idx: [9]
Num of rows per group: 10137
Unique idx: [10]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]