Yicong-Huang opened a new pull request, #52979: URL: https://github.com/apache/spark/pull/52979
### What changes were proposed in this pull request? This PR optimizes the `to_pandas()` method in Spark Connect client to avoid creating an intermediate pandas DataFrame during Arrow to pandas conversion. **Key changes:** - Convert Arrow columns directly to pandas Series using `arrow_col.to_pandas()` instead of converting the entire table first with `table.to_pandas()` - Eliminate temporary column renaming (`col_0`, `col_1`, etc.) since we no longer create an intermediate DataFrame - Apply Spark-specific type converters directly to each Series without going through an intermediate DataFrame ### Why are the changes needed? This optimization brings Spark Connect's `to_pandas()` implementation in line with the regular Spark DataFrame optimization made in PR #52680 ([SPARK-53967](https://issues.apache.org/jira/browse/SPARK-53967)). **Benefits:** 1. **Reduced memory usage**: Eliminates allocation of intermediate DataFrame 2. **Better performance**: Fewer data copies, better memory locality 3. **Consistency**: Makes Spark Connect code path match the optimized regular Spark DataFrame path ### Does this PR introduce _any_ user-facing change? No. This is a pure performance optimization with no API or behavior changes. ### How was this patch tested? **Benchmark setup** (for manual testing): - 1M rows × 102 columns - Mixed types: ~25 complex columns (Date, Timestamp, Struct) + ~77 simple columns (Int, Double, String) - Batch size: 5,000 rows per batch - Config: Arrow enabled, self-destruct enabled ``` """ Benchmark for Spark Connect toPandas optimization with large data and batch processing. This benchmark uses MIXED data types to showcase the optimization: - Complex types (Date, Timestamp, Struct, Array): Need Spark-specific conversion - Simple types (Int, Double, String): Fast bulk conversion The optimization should show ~6-7% improvement especially for complex types. """ from pyspark.sql import SparkSession from pyspark.sql import functions as sf import time # Connect to Spark Connect spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate() # Configure for Arrow with self-destruct and SMALL BATCH SIZE spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") spark.conf.set("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "true") spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000") # Small batches: 5k rows (~1.5MB/batch) # Large dataset: 1M rows with MIXED data types # Keep 'id' column for subsequent operations df = spark.range(1000000).select( sf.col("id"), (sf.col("id") % 2).alias("key"), sf.col("id").alias("v") ) # Add various column types to test conversion performance # These types need Spark-specific conversion: df = df.withColumns({ # Dates and timestamps (need conversion) "date_col_1": sf.date_add(sf.to_date(sf.lit("2024-01-01")), (sf.col("id") % sf.lit(365)).cast("int")), "date_col_2": sf.date_add(sf.to_date(sf.lit("2023-01-01")), (sf.col("id") % sf.lit(180)).cast("int")), "timestamp_col": sf.current_timestamp(), # Struct columns (need conversion) "struct_col_1": sf.struct(sf.col("id").cast("long").alias("a"), (sf.col("id") * sf.lit(2)).cast("long").alias("b")), "struct_col_2": sf.struct((sf.col("id") % sf.lit(10)).cast("int").alias("x"), (sf.col("id") / sf.lit(100.0)).alias("y")), # Array columns (need conversion) - use literals to avoid type issues "array_col": sf.array(sf.lit(1), sf.lit(2), sf.lit(3)), # Numeric columns with computation (simple types) "double_col_1": sf.col("id") / sf.lit(3.14), "double_col_2": sf.col("id") * sf.lit(1.5) + sf.lit(100), "int_col": (sf.col("id") % sf.lit(1000)).cast("int"), }) # Add more mixed columns - some simple, some complex for i in range(45): if i % 5 == 0: # Every 5th column: date type (needs conversion) df = df.withColumn(f"mixed_{i}", sf.date_add(sf.to_date(sf.lit("2024-01-01")), (sf.col("id") % sf.lit(i + 1)).cast("int"))) elif i % 5 == 1: # Struct type (needs conversion) df = df.withColumn(f"mixed_{i}", sf.struct(sf.lit(i).alias("idx"), (sf.col("id") % sf.lit(i + 1)).cast("long").alias("val"))) elif i % 5 == 2: # String with computation (simple type) df = df.withColumn(f"mixed_{i}", sf.concat(sf.lit(f"str_{i}_"), (sf.col("id") % sf.lit(100)).cast("string"))) else: # Numeric computation (simple type) df = df.withColumn(f"mixed_{i}", (sf.col("id") * sf.lit(i) + sf.lit(i)) % sf.lit(1000)) # Add some constant strings for variety for i in range(45): df = df.withColumn(f"const_{i}", sf.lit(f"c{i}")) # Drop the id column at the end (not needed for benchmark) df = df.drop("id") df.cache() df.count() # Warm up pdf = df.toPandas() del pdf # Benchmark start = time.perf_counter() total_rows = 0 total_sum = 0 for i in range(20): # Convert to pandas pdf = df.toPandas() total_rows += len(pdf) total_sum += pdf['v'].sum() del pdf if (i + 1) % 5 == 0: elapsed = time.perf_counter() - start print(f" {i + 1}/20 completed ({elapsed:.1f}s elapsed, ~{elapsed/(i+1):.2f}s per iteration)") elapsed = time.perf_counter() - start ``` **Manual benchmarking results**: 6.5% improvement with mixed data types (dates, timestamps, structs, arrays, and simple types) - Before: 129.3s for 20 iterations (6.46s per iteration) - After: 120.9s for 20 iterations (6.04s per iteration) ### Was this patch authored or co-authored using generative AI tooling? Yes. Co-Genreated-by Cursor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
