Yicong-Huang opened a new pull request, #52979:
URL: https://github.com/apache/spark/pull/52979

   ### What changes were proposed in this pull request?
   
   This PR optimizes the `to_pandas()` method in Spark Connect client to avoid 
creating an intermediate pandas DataFrame during Arrow to pandas conversion.
   
   **Key changes:**
   - Convert Arrow columns directly to pandas Series using 
`arrow_col.to_pandas()` instead of converting the entire table first with 
`table.to_pandas()`
   - Eliminate temporary column renaming (`col_0`, `col_1`, etc.) since we no 
longer create an intermediate DataFrame
   - Apply Spark-specific type converters directly to each Series without going 
through an intermediate DataFrame
   
   ### Why are the changes needed?
   
   This optimization brings Spark Connect's `to_pandas()` implementation in 
line with the regular Spark DataFrame optimization made in PR #52680 
([SPARK-53967](https://issues.apache.org/jira/browse/SPARK-53967)).
   
   **Benefits:**
   1. **Reduced memory usage**: Eliminates allocation of intermediate DataFrame
   2. **Better performance**: Fewer data copies, better memory locality
   3. **Consistency**: Makes Spark Connect code path match the optimized 
regular Spark DataFrame path
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. This is a pure performance optimization with no API or behavior changes.
   
   ### How was this patch tested?
   **Benchmark setup** (for manual testing):
   - 1M rows × 102 columns
   - Mixed types: ~25 complex columns (Date, Timestamp, Struct) + ~77 simple 
columns (Int, Double, String)
   - Batch size: 5,000 rows per batch
   - Config: Arrow enabled, self-destruct enabled
   
   ```
   """
   Benchmark for Spark Connect toPandas optimization with large data and batch 
processing.
   
   This benchmark uses MIXED data types to showcase the optimization:
   - Complex types (Date, Timestamp, Struct, Array): Need Spark-specific 
conversion
   - Simple types (Int, Double, String): Fast bulk conversion
   
   The optimization should show ~6-7% improvement especially for complex types.
   """
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as sf
   import time
   
   # Connect to Spark Connect
   spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
   
   # Configure for Arrow with self-destruct and SMALL BATCH SIZE
   spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
   spark.conf.set("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", 
"true")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")  # 
Small batches: 5k rows (~1.5MB/batch)
   
   # Large dataset: 1M rows with MIXED data types
   # Keep 'id' column for subsequent operations
   df = spark.range(1000000).select(
       sf.col("id"),
       (sf.col("id") % 2).alias("key"), 
       sf.col("id").alias("v")
   )
   
   # Add various column types to test conversion performance
   # These types need Spark-specific conversion:
   df = df.withColumns({
       # Dates and timestamps (need conversion)
       "date_col_1": sf.date_add(sf.to_date(sf.lit("2024-01-01")), 
(sf.col("id") % sf.lit(365)).cast("int")),
       "date_col_2": sf.date_add(sf.to_date(sf.lit("2023-01-01")), 
(sf.col("id") % sf.lit(180)).cast("int")),
       "timestamp_col": sf.current_timestamp(),
       
       # Struct columns (need conversion)
       "struct_col_1": sf.struct(sf.col("id").cast("long").alias("a"), 
(sf.col("id") * sf.lit(2)).cast("long").alias("b")),
       "struct_col_2": sf.struct((sf.col("id") % 
sf.lit(10)).cast("int").alias("x"), (sf.col("id") / sf.lit(100.0)).alias("y")),
       
       # Array columns (need conversion) - use literals to avoid type issues
       "array_col": sf.array(sf.lit(1), sf.lit(2), sf.lit(3)),
       
       # Numeric columns with computation (simple types)
       "double_col_1": sf.col("id") / sf.lit(3.14),
       "double_col_2": sf.col("id") * sf.lit(1.5) + sf.lit(100),
       "int_col": (sf.col("id") % sf.lit(1000)).cast("int"),
   })
   
   # Add more mixed columns - some simple, some complex
   for i in range(45):
       if i % 5 == 0:
           # Every 5th column: date type (needs conversion)
           df = df.withColumn(f"mixed_{i}", 
               sf.date_add(sf.to_date(sf.lit("2024-01-01")), (sf.col("id") % 
sf.lit(i + 1)).cast("int")))
       elif i % 5 == 1:
           # Struct type (needs conversion)
           df = df.withColumn(f"mixed_{i}", 
               sf.struct(sf.lit(i).alias("idx"), (sf.col("id") % sf.lit(i + 
1)).cast("long").alias("val")))
       elif i % 5 == 2:
           # String with computation (simple type)
           df = df.withColumn(f"mixed_{i}", 
               sf.concat(sf.lit(f"str_{i}_"), (sf.col("id") % 
sf.lit(100)).cast("string")))
       else:
           # Numeric computation (simple type)
           df = df.withColumn(f"mixed_{i}", (sf.col("id") * sf.lit(i) + 
sf.lit(i)) % sf.lit(1000))
   
   # Add some constant strings for variety
   for i in range(45):
       df = df.withColumn(f"const_{i}", sf.lit(f"c{i}"))
   
   # Drop the id column at the end (not needed for benchmark)
   df = df.drop("id")
   df.cache()
   df.count()
   
   # Warm up
   pdf = df.toPandas()
   del pdf
   
   # Benchmark
   start = time.perf_counter()
   total_rows = 0
   total_sum = 0
   
   for i in range(20):
       # Convert to pandas
       pdf = df.toPandas()
       total_rows += len(pdf)
       total_sum += pdf['v'].sum()
       del pdf
       
       if (i + 1) % 5 == 0:
           elapsed = time.perf_counter() - start
           print(f"  {i + 1}/20 completed ({elapsed:.1f}s elapsed, 
~{elapsed/(i+1):.2f}s per iteration)")
   
   elapsed = time.perf_counter() - start
   ```
   
   **Manual benchmarking results**: 6.5% improvement with mixed data types 
(dates, timestamps, structs, arrays, and simple types)
      - Before: 129.3s for 20 iterations (6.46s per iteration)
      - After: 120.9s for 20 iterations (6.04s per iteration)
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes. Co-Genreated-by Cursor
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to