Ted Chester Jenks created SPARK-48361: -----------------------------------------
Summary: Correctness: CSV corrupt record filter with aggregate ignored Key: SPARK-48361 URL: https://issues.apache.org/jira/browse/SPARK-48361 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.1 Environment: Using spark shell 3.5.1 on M1 Mac Reporter: Ted Chester Jenks Using corrupt record in CSV parsing for some data cleaning logic, I came across a correctness bug. The following repro can be ran with spark-shell 3.5.1. *Create test.csv with the following content:* {code:java} test,1,2,three four,5,6,seven 8,9 ten,11,12,thirteen {code} *In spark-shell:* {code:java} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ # define a STRING, DOUBLE, DOUBLE, STRING schema for the data val schema = StructType(List(StructField("column1", StringType, true), StructField("column2", DoubleType, true), StructField("column3", DoubleType, true), StructField("column4", StringType, true))) # add a column for corrupt records to the schema val schemaWithCorrupt = StructType(schema.fields :+ StructField("_corrupt_record", StringType, true)) # read the CSV with the schema, headers, permissive parsing, and the corrupt record column val df = spark.read.option("header", "true").option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") # define a UDF to count the commas in the corrupt record column val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else -1) # add a true/false column for whether the number of commas is 3 val dfWithJagged = df.withColumn("__is_jagged", when(col("_corrupt_record").isNull, false).otherwise(countCommas(col("_corrupt_record")) =!= 3)) dfWithJagged.show(){code} *Returns:* {code:java} +-------+-------+-------+--------+---------------+-----------+ |column1|column2|column3| column4|_corrupt_record|__is_jagged| +-------+-------+-------+--------+---------------+-----------+ | four| 5.0| 6.0| seven| NULL| false| | 8| 9.0| NULL| NULL| 8,9| true| | ten| 11.0| 12.0|thirteen| NULL| false| +-------+-------+-------+--------+---------------+-----------+ {code} So far so good... *BUT* *If we add an aggregate before we show:* *In spark-shell:* {code:java} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ # define a STRING, DOUBLE, DOUBLE, STRING schema for the data val schema = StructType(List(StructField("column1", StringType, true), StructField("column2", DoubleType, true), StructField("column3", DoubleType, true), StructField("column4", StringType, true))) # add a column for corrupt records to the schema val schemaWithCorrupt = StructType(schema.fields :+ StructField("_corrupt_record", StringType, true)) # read the CSV with the schema, headers, permissive parsing, and the corrupt record column val df = spark.read.option("header", "true").option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "_corrupt_record").schema(schemaWithCorrupt).csv("test.csv") # define a UDF to count the commas in the corrupt record column val countCommas = udf((s: String) => if (s != null) s.count(_ == ',') else -1) # add a true/false column for whether the number of commas is 3 val dfWithJagged = df.withColumn("__is_jagged", when(col("_corrupt_record").isNull, false).otherwise(countCommas(col("_corrupt_record")) =!= 3)) # sum up column1 val groupedSum = dfWithJagged.groupBy("column1").agg(sum("column2").alias("sum_column2")) groupedSum.show(){code} *We get:* {code:java} +-------+-----------+ |column1|sum_column2| +-------+-----------+ | 8| 9.0| | four| 5.0| | ten| 11.0| +-------+-----------+ {code} *Which is not correct* With the addition of the aggregate, the filter down to rows with 3 commas in the corrupt record column is ignored. This does not happed with any other operators I have tried - just aggregates so far. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org